Git Push Summary
Repository: spark Updated Branches: refs/heads/branch-1.2 [created] 76386e1a2 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.1.0-rc4 [deleted] 5918ea4c9 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [EC2] Factor out Mesos spark-ec2 branch
Repository: spark Updated Branches: refs/heads/master 76386e1a2 - 2aca97c7c [EC2] Factor out Mesos spark-ec2 branch We reference a specific branch in two places. This patch makes it one place. Author: Nicholas Chammas nicholas.cham...@gmail.com Closes #3008 from nchammas/mesos-spark-ec2-branch and squashes the following commits: 10a6089 [Nicholas Chammas] factor out mess spark-ec2 branch Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2aca97c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2aca97c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2aca97c7 Branch: refs/heads/master Commit: 2aca97c7cfdefea8b6f9dbb88951e9acdfd606d9 Parents: 76386e1 Author: Nicholas Chammas nicholas.cham...@gmail.com Authored: Mon Nov 3 09:02:35 2014 -0800 Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu Committed: Mon Nov 3 09:02:35 2014 -0800 -- ec2/spark_ec2.py | 11 +-- 1 file changed, 9 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2aca97c7/ec2/spark_ec2.py -- diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 0d6b82b..50f88f7 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -41,8 +41,9 @@ from boto import ec2 DEFAULT_SPARK_VERSION = 1.1.0 +MESOS_SPARK_EC2_BRANCH = v4 # A URL prefix from which to fetch AMI information -AMI_PREFIX = https://raw.github.com/mesos/spark-ec2/v2/ami-list; +AMI_PREFIX = https://raw.github.com/mesos/spark-ec2/{b}/ami-list.format(b=MESOS_SPARK_EC2_BRANCH) class UsageError(Exception): @@ -583,7 +584,13 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): # NOTE: We should clone the repository before running deploy_files to # prevent ec2-variables.sh from being overwritten -ssh(master, opts, rm -rf spark-ec2 git clone https://github.com/mesos/spark-ec2.git -b v4) +ssh( +host=master, +opts=opts, +command=rm -rf spark-ec2 ++ ++ git clone https://github.com/mesos/spark-ec2.git -b {b}.format(b=MESOS_SPARK_EC2_BRANCH) +) print Deploying files to master... deploy_files(conn, deploy.generic, opts, master_nodes, slave_nodes, modules) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample
Repository: spark Updated Branches: refs/heads/master 2aca97c7c - 3cca19622 [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample The current way of seed distribution makes the random sequences from partition i and i+1 offset by 1. ~~~ In [14]: import random In [15]: r1 = random.Random(10) In [16]: r1.randint(0, 1) Out[16]: 1 In [17]: r1.random() Out[17]: 0.4288890546751146 In [18]: r1.random() Out[18]: 0.5780913011344704 In [19]: r2 = random.Random(10) In [20]: r2.randint(0, 1) Out[20]: 1 In [21]: r2.randint(0, 1) Out[21]: 0 In [22]: r2.random() Out[22]: 0.5780913011344704 ~~~ Note: The new tests are not for this bug fix. Author: Xiangrui Meng m...@databricks.com Closes #3010 from mengxr/SPARK-4148 and squashes the following commits: 869ae4b [Xiangrui Meng] move tests tests.py c1bacd9 [Xiangrui Meng] fix seed distribution and add some tests for rdd.sample Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cca1962 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cca1962 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cca1962 Branch: refs/heads/master Commit: 3cca1962207745814b9d83e791713c91b659c36c Parents: 2aca97c Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 12:24:24 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 12:24:24 2014 -0800 -- python/pyspark/rdd.py| 3 --- python/pyspark/rddsampler.py | 11 +-- python/pyspark/tests.py | 15 +++ 3 files changed, 20 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 550c9dd..4f025b9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -316,9 +316,6 @@ class RDD(object): Return a sampled subset of this RDD (relies on numpy and falls back on default random generator if numpy is unavailable). - - sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP -[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] assert fraction = 0.0, Negative fraction value: %s % fraction return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/rddsampler.py -- diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 528a181..f5c3cfd 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -40,14 +40,13 @@ class RDDSamplerBase(object): def initRandomGenerator(self, split): if self._use_numpy: import numpy -self._random = numpy.random.RandomState(self._seed) +self._random = numpy.random.RandomState(self._seed ^ split) else: -self._random = random.Random(self._seed) +self._random = random.Random(self._seed ^ split) -for _ in range(0, split): -# discard the next few values in the sequence to have a -# different seed for the different splits -self._random.randint(0, 2 ** 32 - 1) +# mixing because the initial seeds are close to each other +for _ in xrange(10): +self._random.randint(0, 1) self._split = split self._rand_initialized = True http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 37a1289..253a471 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -648,6 +648,21 @@ class RDDTests(ReusedPySparkTestCase): self.assertEquals(result.getNumPartitions(), 5) self.assertEquals(result.count(), 3) +def test_sample(self): +rdd = self.sc.parallelize(range(0, 100), 4) +wo = rdd.sample(False, 0.1, 2).collect() +wo_dup = rdd.sample(False, 0.1, 2).collect() +self.assertSetEqual(set(wo), set(wo_dup)) +wr = rdd.sample(True, 0.2, 5).collect() +wr_dup = rdd.sample(True, 0.2, 5).collect() +self.assertSetEqual(set(wr), set(wr_dup)) +wo_s10 = rdd.sample(False, 0.3, 10).collect() +wo_s20 = rdd.sample(False, 0.3, 20).collect() +self.assertNotEqual(set(wo_s10), set(wo_s20)) +wr_s11 = rdd.sample(True, 0.4, 11).collect() +wr_s21 = rdd.sample(True, 0.4, 21).collect() +self.assertNotEqual(set(wr_s11), set(wr_s21)) + class ProfilerTests(PySparkTestCase):
git commit: [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample
Repository: spark Updated Branches: refs/heads/branch-1.2 76386e1a2 - a68321400 [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample The current way of seed distribution makes the random sequences from partition i and i+1 offset by 1. ~~~ In [14]: import random In [15]: r1 = random.Random(10) In [16]: r1.randint(0, 1) Out[16]: 1 In [17]: r1.random() Out[17]: 0.4288890546751146 In [18]: r1.random() Out[18]: 0.5780913011344704 In [19]: r2 = random.Random(10) In [20]: r2.randint(0, 1) Out[20]: 1 In [21]: r2.randint(0, 1) Out[21]: 0 In [22]: r2.random() Out[22]: 0.5780913011344704 ~~~ Note: The new tests are not for this bug fix. Author: Xiangrui Meng m...@databricks.com Closes #3010 from mengxr/SPARK-4148 and squashes the following commits: 869ae4b [Xiangrui Meng] move tests tests.py c1bacd9 [Xiangrui Meng] fix seed distribution and add some tests for rdd.sample (cherry picked from commit 3cca1962207745814b9d83e791713c91b659c36c) Signed-off-by: Xiangrui Meng m...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6832140 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6832140 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6832140 Branch: refs/heads/branch-1.2 Commit: a68321400c1068449698d03cebd0fbf648627133 Parents: 76386e1 Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 12:24:24 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 12:24:47 2014 -0800 -- python/pyspark/rdd.py| 3 --- python/pyspark/rddsampler.py | 11 +-- python/pyspark/tests.py | 15 +++ 3 files changed, 20 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 550c9dd..4f025b9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -316,9 +316,6 @@ class RDD(object): Return a sampled subset of this RDD (relies on numpy and falls back on default random generator if numpy is unavailable). - - sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP -[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] assert fraction = 0.0, Negative fraction value: %s % fraction return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/rddsampler.py -- diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 528a181..f5c3cfd 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -40,14 +40,13 @@ class RDDSamplerBase(object): def initRandomGenerator(self, split): if self._use_numpy: import numpy -self._random = numpy.random.RandomState(self._seed) +self._random = numpy.random.RandomState(self._seed ^ split) else: -self._random = random.Random(self._seed) +self._random = random.Random(self._seed ^ split) -for _ in range(0, split): -# discard the next few values in the sequence to have a -# different seed for the different splits -self._random.randint(0, 2 ** 32 - 1) +# mixing because the initial seeds are close to each other +for _ in xrange(10): +self._random.randint(0, 1) self._split = split self._rand_initialized = True http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 37a1289..253a471 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -648,6 +648,21 @@ class RDDTests(ReusedPySparkTestCase): self.assertEquals(result.getNumPartitions(), 5) self.assertEquals(result.count(), 3) +def test_sample(self): +rdd = self.sc.parallelize(range(0, 100), 4) +wo = rdd.sample(False, 0.1, 2).collect() +wo_dup = rdd.sample(False, 0.1, 2).collect() +self.assertSetEqual(set(wo), set(wo_dup)) +wr = rdd.sample(True, 0.2, 5).collect() +wr_dup = rdd.sample(True, 0.2, 5).collect() +self.assertSetEqual(set(wr), set(wr_dup)) +wo_s10 = rdd.sample(False, 0.3, 10).collect() +wo_s20 = rdd.sample(False, 0.3, 20).collect() +self.assertNotEqual(set(wo_s10), set(wo_s20)) +wr_s11 = rdd.sample(True, 0.4, 11).collect() +wr_s21 = rdd.sample(True, 0.4,
git commit: [SPARK-4211][Build] Fixes hive.version in Maven profile hive-0.13.1
Repository: spark Updated Branches: refs/heads/branch-1.2 a68321400 - fc782896b [SPARK-4211][Build] Fixes hive.version in Maven profile hive-0.13.1 instead of `hive.version=0.13.1`. e.g. mvn -Phive -Phive=0.13.1 Note: `hive.version=0.13.1a` is the default property value. However, when explicitly specifying the `hive-0.13.1` maven profile, the wrong one would be selected. References: PR #2685, which resolved a package incompatibility issue with Hive-0.13.1 by introducing a special version Hive-0.13.1a Author: fi code...@gmail.com Closes #3072 from coderfi/master and squashes the following commits: 7ca4b1e [fi] Fixes the `hive-0.13.1` maven profile referencing `hive.version=0.13.1` instead of the Spark compatible `hive.version=0.13.1a` Note: `hive.version=0.13.1a` is the default version. However, when explicitly specifying the `hive-0.13.1` maven profile, the wrong one would be selected. e.g. mvn -Phive -Phive=0.13.1 See PR #2685 (cherry picked from commit df607da025488d6c924d3d70eddb67f5523080d3) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc782896 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc782896 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc782896 Branch: refs/heads/branch-1.2 Commit: fc782896b5d51161feee950107df2acf17e12422 Parents: a683214 Author: fi code...@gmail.com Authored: Mon Nov 3 12:56:56 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 12:57:09 2014 -0800 -- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fc782896/pom.xml -- diff --git a/pom.xml b/pom.xml index 6191cd3..eb61353 100644 --- a/pom.xml +++ b/pom.xml @@ -1359,7 +1359,7 @@ activeByDefaultfalse/activeByDefault /activation properties -hive.version0.13.1/hive.version +hive.version0.13.1a/hive.version hive.version.short0.13.1/hive.version.short derby.version10.10.1.1/derby.version /properties - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4211][Build] Fixes hive.version in Maven profile hive-0.13.1
Repository: spark Updated Branches: refs/heads/master 3cca19622 - df607da02 [SPARK-4211][Build] Fixes hive.version in Maven profile hive-0.13.1 instead of `hive.version=0.13.1`. e.g. mvn -Phive -Phive=0.13.1 Note: `hive.version=0.13.1a` is the default property value. However, when explicitly specifying the `hive-0.13.1` maven profile, the wrong one would be selected. References: PR #2685, which resolved a package incompatibility issue with Hive-0.13.1 by introducing a special version Hive-0.13.1a Author: fi code...@gmail.com Closes #3072 from coderfi/master and squashes the following commits: 7ca4b1e [fi] Fixes the `hive-0.13.1` maven profile referencing `hive.version=0.13.1` instead of the Spark compatible `hive.version=0.13.1a` Note: `hive.version=0.13.1a` is the default version. However, when explicitly specifying the `hive-0.13.1` maven profile, the wrong one would be selected. e.g. mvn -Phive -Phive=0.13.1 See PR #2685 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df607da0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df607da0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df607da0 Branch: refs/heads/master Commit: df607da025488d6c924d3d70eddb67f5523080d3 Parents: 3cca196 Author: fi code...@gmail.com Authored: Mon Nov 3 12:56:56 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 12:56:56 2014 -0800 -- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/df607da0/pom.xml -- diff --git a/pom.xml b/pom.xml index 6191cd3..eb61353 100644 --- a/pom.xml +++ b/pom.xml @@ -1359,7 +1359,7 @@ activeByDefaultfalse/activeByDefault /activation properties -hive.version0.13.1/hive.version +hive.version0.13.1a/hive.version hive.version.short0.13.1/hive.version.short derby.version10.10.1.1/derby.version /properties - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4207][SQL] Query which has syntax like 'not like' is not working in Spark SQL
Repository: spark Updated Branches: refs/heads/branch-1.2 fc782896b - 292da4ef2 [SPARK-4207][SQL] Query which has syntax like 'not like' is not working in Spark SQL Queries which has 'not like' is not working spark sql. sql(SELECT * FROM records where value not like 'val%') same query works in Spark HiveQL Author: ravipesala ravindra.pes...@huawei.com Closes #3075 from ravipesala/SPARK-4207 and squashes the following commits: 35c11e7 [ravipesala] Supported 'not like' syntax in sql (cherry picked from commit 2b6e1ce6ee7b1ba8160bcbee97f5bbff5c46ca09) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/292da4ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/292da4ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/292da4ef Branch: refs/heads/branch-1.2 Commit: 292da4ef25d6cce23bfde7b9ab663a574dfd2b00 Parents: fc78289 Author: ravipesala ravindra.pes...@huawei.com Authored: Mon Nov 3 13:07:41 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 13:07:56 2014 -0800 -- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala| 1 + .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 5 + 2 files changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/292da4ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 00fc4d7..5e613e0 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -242,6 +242,7 @@ class SqlParser extends AbstractSparkSQLParser { | termExpression ~ (RLIKE ~ termExpression) ^^ { case e1 ~ e2 = RLike(e1, e2) } | termExpression ~ (REGEXP ~ termExpression) ^^ { case e1 ~ e2 = RLike(e1, e2) } | termExpression ~ (LIKE ~ termExpression) ^^ { case e1 ~ e2 = Like(e1, e2) } +| termExpression ~ (NOT ~ LIKE ~ termExpression) ^^ { case e1 ~ e2 = Not(Like(e1, e2)) } | termExpression ~ (IN ~ ( ~ rep1sep(termExpression, ,)) ~ ) ^^ { case e1 ~ e2 = In(e1, e2) } http://git-wip-us.apache.org/repos/asf/spark/blob/292da4ef/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6bf4393..702714a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -938,4 +938,9 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { checkAnswer(sql(SELECT key FROM testData WHERE key not between 0 and 10 order by key), (11 to 100).map(i = Seq(i))) } + + test(SPARK-4207 Query which has syntax like 'not like' is not working in Spark SQL) { +checkAnswer(sql(SELECT key FROM testData WHERE value not like '100%' order by key), +(1 to 99).map(i = Seq(i))) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4207][SQL] Query which has syntax like 'not like' is not working in Spark SQL
Repository: spark Updated Branches: refs/heads/master df607da02 - 2b6e1ce6e [SPARK-4207][SQL] Query which has syntax like 'not like' is not working in Spark SQL Queries which has 'not like' is not working spark sql. sql(SELECT * FROM records where value not like 'val%') same query works in Spark HiveQL Author: ravipesala ravindra.pes...@huawei.com Closes #3075 from ravipesala/SPARK-4207 and squashes the following commits: 35c11e7 [ravipesala] Supported 'not like' syntax in sql Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b6e1ce6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b6e1ce6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b6e1ce6 Branch: refs/heads/master Commit: 2b6e1ce6ee7b1ba8160bcbee97f5bbff5c46ca09 Parents: df607da Author: ravipesala ravindra.pes...@huawei.com Authored: Mon Nov 3 13:07:41 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 13:07:41 2014 -0800 -- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala| 1 + .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 5 + 2 files changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2b6e1ce6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 00fc4d7..5e613e0 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -242,6 +242,7 @@ class SqlParser extends AbstractSparkSQLParser { | termExpression ~ (RLIKE ~ termExpression) ^^ { case e1 ~ e2 = RLike(e1, e2) } | termExpression ~ (REGEXP ~ termExpression) ^^ { case e1 ~ e2 = RLike(e1, e2) } | termExpression ~ (LIKE ~ termExpression) ^^ { case e1 ~ e2 = Like(e1, e2) } +| termExpression ~ (NOT ~ LIKE ~ termExpression) ^^ { case e1 ~ e2 = Not(Like(e1, e2)) } | termExpression ~ (IN ~ ( ~ rep1sep(termExpression, ,)) ~ ) ^^ { case e1 ~ e2 = In(e1, e2) } http://git-wip-us.apache.org/repos/asf/spark/blob/2b6e1ce6/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6bf4393..702714a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -938,4 +938,9 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { checkAnswer(sql(SELECT key FROM testData WHERE key not between 0 and 10 order by key), (11 to 100).map(i = Seq(i))) } + + test(SPARK-4207 Query which has syntax like 'not like' is not working in Spark SQL) { +checkAnswer(sql(SELECT key FROM testData WHERE value not like '100%' order by key), +(1 to 99).map(i = Seq(i))) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-3594] [PySpark] [SQL] take more rows to infer schema or sampling
Repository: spark Updated Branches: refs/heads/branch-1.2 292da4ef2 - cc5dc4247 [SPARK-3594] [PySpark] [SQL] take more rows to infer schema or sampling This patch will try to infer schema for RDD which has empty value (None, [], {}) in the first row. It will try first 100 rows and merge the types into schema, also merge fields of StructType together. If there is still NullType in schema, then it will show an warning, tell user to try with sampling. If sampling is presented, it will infer schema from all the rows after sampling. Also, add samplingRatio for jsonFile() and jsonRDD() Author: Davies Liu davies@gmail.com Author: Davies Liu dav...@databricks.com Closes #2716 from davies/infer and squashes the following commits: e678f6d [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer 34b5c63 [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer 567dc60 [Davies Liu] update docs 9767b27 [Davies Liu] Merge branch 'master' into infer e48d7fb [Davies Liu] fix tests 29e94d5 [Davies Liu] let NullType inherit from PrimitiveType ee5d524 [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer 540d1d5 [Davies Liu] merge fields for StructType f93fd84 [Davies Liu] add more tests 3603e00 [Davies Liu] take more rows to infer schema, or infer the schema by sampling the RDD (cherry picked from commit 24544fbce05665ab4999a1fe5aac434d29cd912c) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc5dc424 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc5dc424 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc5dc424 Branch: refs/heads/branch-1.2 Commit: cc5dc4247979dc001302f7af978801b789acdbfa Parents: 292da4e Author: Davies Liu davies@gmail.com Authored: Mon Nov 3 13:17:09 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 13:17:25 2014 -0800 -- python/pyspark/sql.py | 196 --- python/pyspark/tests.py | 19 ++ .../spark/sql/catalyst/types/dataTypes.scala| 2 +- 3 files changed, 148 insertions(+), 69 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cc5dc424/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 98e41f8..675df08 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -109,6 +109,15 @@ class PrimitiveType(DataType): return self is other +class NullType(PrimitiveType): + +Spark SQL NullType + +The data type representing None, used for the types which has not +been inferred. + + + class StringType(PrimitiveType): Spark SQL StringType @@ -331,7 +340,7 @@ class StructField(DataType): -def __init__(self, name, dataType, nullable, metadata=None): +def __init__(self, name, dataType, nullable=True, metadata=None): Creates a StructField :param name: the name of this field. :param dataType: the data type of this field. @@ -484,6 +493,7 @@ def _parse_datatype_json_value(json_value): # Mapping Python types to Spark SQL DataType _type_mappings = { +type(None): NullType, bool: BooleanType, int: IntegerType, long: LongType, @@ -500,22 +510,22 @@ _type_mappings = { def _infer_type(obj): Infer the DataType from obj -if obj is None: -raise ValueError(Can not infer type for None) - dataType = _type_mappings.get(type(obj)) if dataType is not None: return dataType() if isinstance(obj, dict): -if not obj: -raise ValueError(Can not infer type for empty dict) -key, value = obj.iteritems().next() -return MapType(_infer_type(key), _infer_type(value), True) +for key, value in obj.iteritems(): +if key is not None and value is not None: +return MapType(_infer_type(key), _infer_type(value), True) +else: +return MapType(NullType(), NullType(), True) elif isinstance(obj, (list, array)): -if not obj: -raise ValueError(Can not infer type for empty list/array) -return ArrayType(_infer_type(obj[0]), True) +for v in obj: +if v is not None: +return ArrayType(_infer_type(obj[0]), True) +else: +return ArrayType(NullType(), True) else: try: return _infer_schema(obj) @@ -548,60 +558,93 @@ def _infer_schema(row): return StructType(fields) -def _create_converter(obj, dataType): +def _has_nulltype(dt): + Return whether there is NullType in `dt` or not +if isinstance(dt, StructType): +return
git commit: [SPARK-3594] [PySpark] [SQL] take more rows to infer schema or sampling
Repository: spark Updated Branches: refs/heads/master 2b6e1ce6e - 24544fbce [SPARK-3594] [PySpark] [SQL] take more rows to infer schema or sampling This patch will try to infer schema for RDD which has empty value (None, [], {}) in the first row. It will try first 100 rows and merge the types into schema, also merge fields of StructType together. If there is still NullType in schema, then it will show an warning, tell user to try with sampling. If sampling is presented, it will infer schema from all the rows after sampling. Also, add samplingRatio for jsonFile() and jsonRDD() Author: Davies Liu davies@gmail.com Author: Davies Liu dav...@databricks.com Closes #2716 from davies/infer and squashes the following commits: e678f6d [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer 34b5c63 [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer 567dc60 [Davies Liu] update docs 9767b27 [Davies Liu] Merge branch 'master' into infer e48d7fb [Davies Liu] fix tests 29e94d5 [Davies Liu] let NullType inherit from PrimitiveType ee5d524 [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer 540d1d5 [Davies Liu] merge fields for StructType f93fd84 [Davies Liu] add more tests 3603e00 [Davies Liu] take more rows to infer schema, or infer the schema by sampling the RDD Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24544fbc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24544fbc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24544fbc Branch: refs/heads/master Commit: 24544fbce05665ab4999a1fe5aac434d29cd912c Parents: 2b6e1ce Author: Davies Liu davies@gmail.com Authored: Mon Nov 3 13:17:09 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 13:17:09 2014 -0800 -- python/pyspark/sql.py | 196 --- python/pyspark/tests.py | 19 ++ .../spark/sql/catalyst/types/dataTypes.scala| 2 +- 3 files changed, 148 insertions(+), 69 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/24544fbc/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 98e41f8..675df08 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -109,6 +109,15 @@ class PrimitiveType(DataType): return self is other +class NullType(PrimitiveType): + +Spark SQL NullType + +The data type representing None, used for the types which has not +been inferred. + + + class StringType(PrimitiveType): Spark SQL StringType @@ -331,7 +340,7 @@ class StructField(DataType): -def __init__(self, name, dataType, nullable, metadata=None): +def __init__(self, name, dataType, nullable=True, metadata=None): Creates a StructField :param name: the name of this field. :param dataType: the data type of this field. @@ -484,6 +493,7 @@ def _parse_datatype_json_value(json_value): # Mapping Python types to Spark SQL DataType _type_mappings = { +type(None): NullType, bool: BooleanType, int: IntegerType, long: LongType, @@ -500,22 +510,22 @@ _type_mappings = { def _infer_type(obj): Infer the DataType from obj -if obj is None: -raise ValueError(Can not infer type for None) - dataType = _type_mappings.get(type(obj)) if dataType is not None: return dataType() if isinstance(obj, dict): -if not obj: -raise ValueError(Can not infer type for empty dict) -key, value = obj.iteritems().next() -return MapType(_infer_type(key), _infer_type(value), True) +for key, value in obj.iteritems(): +if key is not None and value is not None: +return MapType(_infer_type(key), _infer_type(value), True) +else: +return MapType(NullType(), NullType(), True) elif isinstance(obj, (list, array)): -if not obj: -raise ValueError(Can not infer type for empty list/array) -return ArrayType(_infer_type(obj[0]), True) +for v in obj: +if v is not None: +return ArrayType(_infer_type(obj[0]), True) +else: +return ArrayType(NullType(), True) else: try: return _infer_schema(obj) @@ -548,60 +558,93 @@ def _infer_schema(row): return StructType(fields) -def _create_converter(obj, dataType): +def _has_nulltype(dt): + Return whether there is NullType in `dt` or not +if isinstance(dt, StructType): +return any(_has_nulltype(f.dataType) for f in dt.fields) +elif isinstance(dt, ArrayType): +return _has_nulltype((dt.elementType)) +
git commit: [SPARK-4202][SQL] Simple DSL support for Scala UDF
Repository: spark Updated Branches: refs/heads/master 24544fbce - c238fb423 [SPARK-4202][SQL] Simple DSL support for Scala UDF This feature is based on an offline discussion with mengxr, hopefully can be useful for the new MLlib pipeline API. For the following test snippet ```scala case class KeyValue(key: Int, value: String) val testData = sc.parallelize(1 to 10).map(i = KeyValue(i, i.toString)).toSchemaRDD def foo(a: Int, b: String) = a.toString + b ``` the newly introduced DSL enables the following syntax ```scala import org.apache.spark.sql.catalyst.dsl._ testData.select(Star(None), foo.call('key, 'value) as 'result) ``` which is equivalent to ```scala testData.registerTempTable(testData) sqlContext.registerFunction(foo, foo) sql(SELECT *, foo(key, value) AS result FROM testData) ``` Author: Cheng Lian l...@databricks.com Closes #3067 from liancheng/udf-dsl and squashes the following commits: f132818 [Cheng Lian] Adds DSL support for Scala UDF Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c238fb42 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c238fb42 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c238fb42 Branch: refs/heads/master Commit: c238fb423d1011bd1b1e6201d769b72e52664fc6 Parents: 24544fb Author: Cheng Lian l...@databricks.com Authored: Mon Nov 3 13:20:33 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 13:20:33 2014 -0800 -- .../apache/spark/sql/catalyst/dsl/package.scala | 59 .../org/apache/spark/sql/DslQuerySuite.scala| 17 -- 2 files changed, 72 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c238fb42/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 7e6d770..3314e15 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -22,6 +22,7 @@ import java.sql.{Date, Timestamp} import org.apache.spark.sql.catalyst.types.decimal.Decimal import scala.language.implicitConversions +import scala.reflect.runtime.universe.{TypeTag, typeTag} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ @@ -285,4 +286,62 @@ package object dsl { def writeToFile(path: String) = WriteToFile(path, logicalPlan) } } + + case class ScalaUdfBuilder[T: TypeTag](f: AnyRef) { +def call(args: Expression*) = ScalaUdf(f, ScalaReflection.schemaFor(typeTag[T]).dataType, args) + } + + // scalastyle:off + /** functionToUdfBuilder 1-22 were generated by this script + +(1 to 22).map { x = + val argTypes = Seq.fill(x)(_).mkString(, ) + simplicit def functionToUdfBuilder[T: TypeTag](func: Function$x[$argTypes, T]) = ScalaUdfBuilder(func) +} + */ + + implicit def functionToUdfBuilder[T: TypeTag](func: Function1[_, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function2[_, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function3[_, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function4[_, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function5[_, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function6[_, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function7[_, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function8[_, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function9[_, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function10[_, _, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function11[_, _, _, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function12[_, _, _, _, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function13[_, _, _, _, _, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def
git commit: [SPARK-4202][SQL] Simple DSL support for Scala UDF
Repository: spark Updated Branches: refs/heads/branch-1.2 cc5dc4247 - 572300ba8 [SPARK-4202][SQL] Simple DSL support for Scala UDF This feature is based on an offline discussion with mengxr, hopefully can be useful for the new MLlib pipeline API. For the following test snippet ```scala case class KeyValue(key: Int, value: String) val testData = sc.parallelize(1 to 10).map(i = KeyValue(i, i.toString)).toSchemaRDD def foo(a: Int, b: String) = a.toString + b ``` the newly introduced DSL enables the following syntax ```scala import org.apache.spark.sql.catalyst.dsl._ testData.select(Star(None), foo.call('key, 'value) as 'result) ``` which is equivalent to ```scala testData.registerTempTable(testData) sqlContext.registerFunction(foo, foo) sql(SELECT *, foo(key, value) AS result FROM testData) ``` Author: Cheng Lian l...@databricks.com Closes #3067 from liancheng/udf-dsl and squashes the following commits: f132818 [Cheng Lian] Adds DSL support for Scala UDF (cherry picked from commit c238fb423d1011bd1b1e6201d769b72e52664fc6) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/572300ba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/572300ba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/572300ba Branch: refs/heads/branch-1.2 Commit: 572300ba8a5f24b52f19d7033a456248da20bfed Parents: cc5dc42 Author: Cheng Lian l...@databricks.com Authored: Mon Nov 3 13:20:33 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 13:20:51 2014 -0800 -- .../apache/spark/sql/catalyst/dsl/package.scala | 59 .../org/apache/spark/sql/DslQuerySuite.scala| 17 -- 2 files changed, 72 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/572300ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 7e6d770..3314e15 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -22,6 +22,7 @@ import java.sql.{Date, Timestamp} import org.apache.spark.sql.catalyst.types.decimal.Decimal import scala.language.implicitConversions +import scala.reflect.runtime.universe.{TypeTag, typeTag} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ @@ -285,4 +286,62 @@ package object dsl { def writeToFile(path: String) = WriteToFile(path, logicalPlan) } } + + case class ScalaUdfBuilder[T: TypeTag](f: AnyRef) { +def call(args: Expression*) = ScalaUdf(f, ScalaReflection.schemaFor(typeTag[T]).dataType, args) + } + + // scalastyle:off + /** functionToUdfBuilder 1-22 were generated by this script + +(1 to 22).map { x = + val argTypes = Seq.fill(x)(_).mkString(, ) + simplicit def functionToUdfBuilder[T: TypeTag](func: Function$x[$argTypes, T]) = ScalaUdfBuilder(func) +} + */ + + implicit def functionToUdfBuilder[T: TypeTag](func: Function1[_, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function2[_, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function3[_, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function4[_, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function5[_, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function6[_, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function7[_, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function8[_, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function9[_, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function10[_, _, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function11[_, _, _, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function12[_, _, _, _, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def functionToUdfBuilder[T: TypeTag](func: Function13[_, _, _, _, _, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func) + + implicit def
git commit: [SPARK-4152] [SQL] Avoid data change in CTAS while table already existed
Repository: spark Updated Branches: refs/heads/master c238fb423 - e83f13e8d [SPARK-4152] [SQL] Avoid data change in CTAS while table already existed CREATE TABLE t1 (a String); CREATE TABLE t1 AS SELECT key FROM src; â throw exception CREATE TABLE if not exists t1 AS SELECT key FROM src; â expect do nothing, currently it will overwrite the t1, which is incorrect. Author: Cheng Hao hao.ch...@intel.com Closes #3013 from chenghao-intel/ctas_unittest and squashes the following commits: 194113e [Cheng Hao] fix bug in CTAS when table already existed Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e83f13e8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e83f13e8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e83f13e8 Branch: refs/heads/master Commit: e83f13e8d37ca33f4e183e977d077221b90c6025 Parents: c238fb4 Author: Cheng Hao hao.ch...@intel.com Authored: Mon Nov 3 13:59:43 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 13:59:43 2014 -0800 -- .../spark/sql/catalyst/analysis/Catalog.scala | 22 .../spark/sql/hive/HiveMetastoreCatalog.scala | 6 ++ .../hive/execution/CreateTableAsSelect.scala| 12 ++- .../sql/hive/execution/SQLQuerySuite.scala | 9 ++-- 4 files changed, 46 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e83f13e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 2059a91..0415d74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -28,6 +28,8 @@ trait Catalog { def caseSensitive: Boolean + def tableExists(db: Option[String], tableName: String): Boolean + def lookupRelation( databaseName: Option[String], tableName: String, @@ -82,6 +84,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { tables.clear() } + override def tableExists(db: Option[String], tableName: String): Boolean = { +val (dbName, tblName) = processDatabaseAndTableName(db, tableName) +tables.get(tblName) match { + case Some(_) = true + case None = false +} + } + override def lookupRelation( databaseName: Option[String], tableName: String, @@ -107,6 +117,14 @@ trait OverrideCatalog extends Catalog { // TODO: This doesn't work when the database changes... val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]() + abstract override def tableExists(db: Option[String], tableName: String): Boolean = { +val (dbName, tblName) = processDatabaseAndTableName(db, tableName) +overrides.get((dbName, tblName)) match { + case Some(_) = true + case None = super.tableExists(db, tableName) +} + } + abstract override def lookupRelation( databaseName: Option[String], tableName: String, @@ -149,6 +167,10 @@ object EmptyCatalog extends Catalog { val caseSensitive: Boolean = true + def tableExists(db: Option[String], tableName: String): Boolean = { +throw new UnsupportedOperationException + } + def lookupRelation( databaseName: Option[String], tableName: String, http://git-wip-us.apache.org/repos/asf/spark/blob/e83f13e8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 096b4a0..0baf4c9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -57,6 +57,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val caseSensitive: Boolean = false + def tableExists(db: Option[String], tableName: String): Boolean = { +val (databaseName, tblName) = processDatabaseAndTableName( + db.getOrElse(hive.sessionState.getCurrentDatabase), tableName) +client.getTable(databaseName, tblName, false) != null + } + def lookupRelation( db: Option[String], tableName: String, http://git-wip-us.apache.org/repos/asf/spark/blob/e83f13e8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
git commit: [SPARK-4152] [SQL] Avoid data change in CTAS while table already existed
Repository: spark Updated Branches: refs/heads/branch-1.2 572300ba8 - 6104754f7 [SPARK-4152] [SQL] Avoid data change in CTAS while table already existed CREATE TABLE t1 (a String); CREATE TABLE t1 AS SELECT key FROM src; â throw exception CREATE TABLE if not exists t1 AS SELECT key FROM src; â expect do nothing, currently it will overwrite the t1, which is incorrect. Author: Cheng Hao hao.ch...@intel.com Closes #3013 from chenghao-intel/ctas_unittest and squashes the following commits: 194113e [Cheng Hao] fix bug in CTAS when table already existed (cherry picked from commit e83f13e8d37ca33f4e183e977d077221b90c6025) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6104754f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6104754f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6104754f Branch: refs/heads/branch-1.2 Commit: 6104754f711da9eb0c09daf377bcd750d2d23f8a Parents: 572300b Author: Cheng Hao hao.ch...@intel.com Authored: Mon Nov 3 13:59:43 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 14:00:06 2014 -0800 -- .../spark/sql/catalyst/analysis/Catalog.scala | 22 .../spark/sql/hive/HiveMetastoreCatalog.scala | 6 ++ .../hive/execution/CreateTableAsSelect.scala| 12 ++- .../sql/hive/execution/SQLQuerySuite.scala | 9 ++-- 4 files changed, 46 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6104754f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 2059a91..0415d74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -28,6 +28,8 @@ trait Catalog { def caseSensitive: Boolean + def tableExists(db: Option[String], tableName: String): Boolean + def lookupRelation( databaseName: Option[String], tableName: String, @@ -82,6 +84,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { tables.clear() } + override def tableExists(db: Option[String], tableName: String): Boolean = { +val (dbName, tblName) = processDatabaseAndTableName(db, tableName) +tables.get(tblName) match { + case Some(_) = true + case None = false +} + } + override def lookupRelation( databaseName: Option[String], tableName: String, @@ -107,6 +117,14 @@ trait OverrideCatalog extends Catalog { // TODO: This doesn't work when the database changes... val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]() + abstract override def tableExists(db: Option[String], tableName: String): Boolean = { +val (dbName, tblName) = processDatabaseAndTableName(db, tableName) +overrides.get((dbName, tblName)) match { + case Some(_) = true + case None = super.tableExists(db, tableName) +} + } + abstract override def lookupRelation( databaseName: Option[String], tableName: String, @@ -149,6 +167,10 @@ object EmptyCatalog extends Catalog { val caseSensitive: Boolean = true + def tableExists(db: Option[String], tableName: String): Boolean = { +throw new UnsupportedOperationException + } + def lookupRelation( databaseName: Option[String], tableName: String, http://git-wip-us.apache.org/repos/asf/spark/blob/6104754f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 096b4a0..0baf4c9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -57,6 +57,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val caseSensitive: Boolean = false + def tableExists(db: Option[String], tableName: String): Boolean = { +val (databaseName, tblName) = processDatabaseAndTableName( + db.getOrElse(hive.sessionState.getCurrentDatabase), tableName) +client.getTable(databaseName, tblName, false) != null + } + def lookupRelation( db: Option[String], tableName: String,
git commit: [SQL] More aggressive defaults
Repository: spark Updated Branches: refs/heads/branch-1.2 6104754f7 - 51985f78c [SQL] More aggressive defaults - Turns on compression for in-memory cached data by default - Changes the default parquet compression format back to gzip (we have seen more OOMs with production workloads due to the way Snappy allocates memory) - Ups the batch size to 10,000 rows - Increases the broadcast threshold to 10mb. - Uses our parquet implementation instead of the hive one by default. - Cache parquet metadata by default. Author: Michael Armbrust mich...@databricks.com Closes #3064 from marmbrus/fasterDefaults and squashes the following commits: 97ee9f8 [Michael Armbrust] parquet codec docs e641694 [Michael Armbrust] Remote also a12866a [Michael Armbrust] Cache metadata. 2d73acc [Michael Armbrust] Update docs defaults. d63d2d5 [Michael Armbrust] document parquet option da373f9 [Michael Armbrust] More aggressive defaults (cherry picked from commit 25bef7e6951301e93004567fc0cef96bf8d1a224) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51985f78 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51985f78 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51985f78 Branch: refs/heads/branch-1.2 Commit: 51985f78ca5f728f8b9233b703110f541d27b274 Parents: 6104754 Author: Michael Armbrust mich...@databricks.com Authored: Mon Nov 3 14:08:27 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 14:08:40 2014 -0800 -- docs/sql-programming-guide.md | 18 +- .../main/scala/org/apache/spark/sql/SQLConf.scala | 10 +- .../sql/parquet/ParquetTableOperations.scala | 6 +++--- .../org/apache/spark/sql/hive/HiveContext.scala | 2 +- 4 files changed, 22 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51985f78/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index d4ade93..e399fec 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -582,19 +582,27 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or /tr tr tdcodespark.sql.parquet.cacheMetadata/code/td - tdfalse/td + tdtrue/td td Turns on caching of Parquet schema metadata. Can speed up querying of static data. /td /tr tr tdcodespark.sql.parquet.compression.codec/code/td - tdsnappy/td + tdgzip/td td Sets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo. /td /tr +tr + tdcodespark.sql.hive.convertMetastoreParquet/code/td + tdtrue/td + td +When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in +support. + /td +/tr /table ## JSON Datasets @@ -815,7 +823,7 @@ Configuration of in-memory caching can be done using the `setConf` method on SQL trthProperty Name/ththDefault/ththMeaning/th/tr tr tdcodespark.sql.inMemoryColumnarStorage.compressed/code/td - tdfalse/td + tdtrue/td td When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data. @@ -823,7 +831,7 @@ Configuration of in-memory caching can be done using the `setConf` method on SQL /tr tr tdcodespark.sql.inMemoryColumnarStorage.batchSize/code/td - td1000/td + td1/td td Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data. @@ -841,7 +849,7 @@ that these options will be deprecated in future release as more optimizations ar trthProperty Name/ththDefault/ththMeaning/th/tr tr tdcodespark.sql.autoBroadcastJoinThreshold/code/td -td1/td +td10485760 (10 MB)/td td Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently http://git-wip-us.apache.org/repos/asf/spark/blob/51985f78/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 07e6e2e..279495a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -79,13 +79,13 @@ private[sql] trait SQLConf { private[spark] def dialect: String = getConf(DIALECT, sql) /** When
git commit: [SQL] More aggressive defaults
Repository: spark Updated Branches: refs/heads/master e83f13e8d - 25bef7e69 [SQL] More aggressive defaults - Turns on compression for in-memory cached data by default - Changes the default parquet compression format back to gzip (we have seen more OOMs with production workloads due to the way Snappy allocates memory) - Ups the batch size to 10,000 rows - Increases the broadcast threshold to 10mb. - Uses our parquet implementation instead of the hive one by default. - Cache parquet metadata by default. Author: Michael Armbrust mich...@databricks.com Closes #3064 from marmbrus/fasterDefaults and squashes the following commits: 97ee9f8 [Michael Armbrust] parquet codec docs e641694 [Michael Armbrust] Remote also a12866a [Michael Armbrust] Cache metadata. 2d73acc [Michael Armbrust] Update docs defaults. d63d2d5 [Michael Armbrust] document parquet option da373f9 [Michael Armbrust] More aggressive defaults Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25bef7e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25bef7e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25bef7e6 Branch: refs/heads/master Commit: 25bef7e6951301e93004567fc0cef96bf8d1a224 Parents: e83f13e Author: Michael Armbrust mich...@databricks.com Authored: Mon Nov 3 14:08:27 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 14:08:27 2014 -0800 -- docs/sql-programming-guide.md | 18 +- .../main/scala/org/apache/spark/sql/SQLConf.scala | 10 +- .../sql/parquet/ParquetTableOperations.scala | 6 +++--- .../org/apache/spark/sql/hive/HiveContext.scala | 2 +- 4 files changed, 22 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/25bef7e6/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index d4ade93..e399fec 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -582,19 +582,27 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or /tr tr tdcodespark.sql.parquet.cacheMetadata/code/td - tdfalse/td + tdtrue/td td Turns on caching of Parquet schema metadata. Can speed up querying of static data. /td /tr tr tdcodespark.sql.parquet.compression.codec/code/td - tdsnappy/td + tdgzip/td td Sets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo. /td /tr +tr + tdcodespark.sql.hive.convertMetastoreParquet/code/td + tdtrue/td + td +When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in +support. + /td +/tr /table ## JSON Datasets @@ -815,7 +823,7 @@ Configuration of in-memory caching can be done using the `setConf` method on SQL trthProperty Name/ththDefault/ththMeaning/th/tr tr tdcodespark.sql.inMemoryColumnarStorage.compressed/code/td - tdfalse/td + tdtrue/td td When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data. @@ -823,7 +831,7 @@ Configuration of in-memory caching can be done using the `setConf` method on SQL /tr tr tdcodespark.sql.inMemoryColumnarStorage.batchSize/code/td - td1000/td + td1/td td Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data. @@ -841,7 +849,7 @@ that these options will be deprecated in future release as more optimizations ar trthProperty Name/ththDefault/ththMeaning/th/tr tr tdcodespark.sql.autoBroadcastJoinThreshold/code/td -td1/td +td10485760 (10 MB)/td td Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently http://git-wip-us.apache.org/repos/asf/spark/blob/25bef7e6/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 07e6e2e..279495a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -79,13 +79,13 @@ private[sql] trait SQLConf { private[spark] def dialect: String = getConf(DIALECT, sql) /** When true tables cached using the in-memory columnar caching will be compressed. */ - private[spark] def useCompression: Boolean =
git commit: SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta...
Repository: spark Updated Branches: refs/heads/master 25bef7e69 - 28128150e SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta... ...ntiation Author: Sandy Ryza sa...@cloudera.com Closes #3045 from sryza/sandy-spark-4178 and squashes the following commits: 8d2e70e [Sandy Ryza] Kostas's review feedback e5b27c0 [Sandy Ryza] SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader instantiation Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28128150 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28128150 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28128150 Branch: refs/heads/master Commit: 28128150e7e0c2b7d1c483e67214bdaef59f7d75 Parents: 25bef7e Author: Sandy Ryza sa...@cloudera.com Authored: Mon Nov 3 15:19:01 2014 -0800 Committer: Patrick Wendell pwend...@gmail.com Committed: Mon Nov 3 15:19:01 2014 -0800 -- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 25 ++ .../org/apache/spark/rdd/NewHadoopRDD.scala | 26 ++- .../spark/metrics/InputMetricsSuite.scala | 27 ++-- 3 files changed, 53 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/28128150/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 946fb56..a157e36 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -211,20 +211,11 @@ class HadoopRDD[K, V]( val split = theSplit.asInstanceOf[HadoopPartition] logInfo(Input split: + split.inputSplit) - var reader: RecordReader[K, V] = null val jobConf = getJobConf() - val inputFormat = getInputFormat(jobConf) - HadoopRDD.addLocalConfiguration(new SimpleDateFormat(MMddHHmm).format(createTime), -context.stageId, theSplit.index, context.attemptId.toInt, jobConf) - reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) - - // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener{ context = closeIfNeeded() } - val key: K = reader.createKey() - val value: V = reader.createValue() val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - // Find a function that will return the FileSystem bytes read by this thread. + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) { SparkHadoopUtil.get.getFSBytesReadOnThreadCallback( split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf) @@ -234,6 +225,18 @@ class HadoopRDD[K, V]( if (bytesReadCallback.isDefined) { context.taskMetrics.inputMetrics = Some(inputMetrics) } + + var reader: RecordReader[K, V] = null + val inputFormat = getInputFormat(jobConf) + HadoopRDD.addLocalConfiguration(new SimpleDateFormat(MMddHHmm).format(createTime), +context.stageId, theSplit.index, context.attemptId.toInt, jobConf) + reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener{ context = closeIfNeeded() } + val key: K = reader.createKey() + val value: V = reader.createValue() + var recordsSinceMetricsUpdate = 0 override def getNext() = { http://git-wip-us.apache.org/repos/asf/spark/blob/28128150/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 6d6b867..351e145 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -107,20 +107,10 @@ class NewHadoopRDD[K, V]( val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo(Input split: + split.serializableHadoopSplit) val conf = confBroadcast.value.value - val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) - val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) - val format = inputFormatClass.newInstance - format match { -case configurable: Configurable
git commit: SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta...
Repository: spark Updated Branches: refs/heads/branch-1.2 51985f78c - fa86d862f SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta... ...ntiation Author: Sandy Ryza sa...@cloudera.com Closes #3045 from sryza/sandy-spark-4178 and squashes the following commits: 8d2e70e [Sandy Ryza] Kostas's review feedback e5b27c0 [Sandy Ryza] SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader instantiation (cherry picked from commit 28128150e7e0c2b7d1c483e67214bdaef59f7d75) Signed-off-by: Patrick Wendell pwend...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa86d862 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa86d862 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa86d862 Branch: refs/heads/branch-1.2 Commit: fa86d862f98cfea3d9afff6e61b3141c9b08f949 Parents: 51985f7 Author: Sandy Ryza sa...@cloudera.com Authored: Mon Nov 3 15:19:01 2014 -0800 Committer: Patrick Wendell pwend...@gmail.com Committed: Mon Nov 3 15:19:13 2014 -0800 -- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 25 ++ .../org/apache/spark/rdd/NewHadoopRDD.scala | 26 ++- .../spark/metrics/InputMetricsSuite.scala | 27 ++-- 3 files changed, 53 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fa86d862/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 946fb56..a157e36 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -211,20 +211,11 @@ class HadoopRDD[K, V]( val split = theSplit.asInstanceOf[HadoopPartition] logInfo(Input split: + split.inputSplit) - var reader: RecordReader[K, V] = null val jobConf = getJobConf() - val inputFormat = getInputFormat(jobConf) - HadoopRDD.addLocalConfiguration(new SimpleDateFormat(MMddHHmm).format(createTime), -context.stageId, theSplit.index, context.attemptId.toInt, jobConf) - reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) - - // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener{ context = closeIfNeeded() } - val key: K = reader.createKey() - val value: V = reader.createValue() val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - // Find a function that will return the FileSystem bytes read by this thread. + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) { SparkHadoopUtil.get.getFSBytesReadOnThreadCallback( split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf) @@ -234,6 +225,18 @@ class HadoopRDD[K, V]( if (bytesReadCallback.isDefined) { context.taskMetrics.inputMetrics = Some(inputMetrics) } + + var reader: RecordReader[K, V] = null + val inputFormat = getInputFormat(jobConf) + HadoopRDD.addLocalConfiguration(new SimpleDateFormat(MMddHHmm).format(createTime), +context.stageId, theSplit.index, context.attemptId.toInt, jobConf) + reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener{ context = closeIfNeeded() } + val key: K = reader.createKey() + val value: V = reader.createValue() + var recordsSinceMetricsUpdate = 0 override def getNext() = { http://git-wip-us.apache.org/repos/asf/spark/blob/fa86d862/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 6d6b867..351e145 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -107,20 +107,10 @@ class NewHadoopRDD[K, V]( val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo(Input split: + split.serializableHadoopSplit) val conf = confBroadcast.value.value - val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) - val hadoopAttemptContext =
git commit: [SQL] Convert arguments to Scala UDFs
Repository: spark Updated Branches: refs/heads/master 28128150e - 15b58a223 [SQL] Convert arguments to Scala UDFs Author: Michael Armbrust mich...@databricks.com Closes #3077 from marmbrus/udfsWithUdts and squashes the following commits: 34b5f27 [Michael Armbrust] style 504adef [Michael Armbrust] Convert arguments to Scala UDFs Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15b58a22 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15b58a22 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15b58a22 Branch: refs/heads/master Commit: 15b58a2234ab7ba30c9c0cbb536177a3c725e350 Parents: 2812815 Author: Michael Armbrust mich...@databricks.com Authored: Mon Nov 3 18:04:51 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 18:04:51 2014 -0800 -- .../sql/catalyst/expressions/ScalaUdf.scala | 560 ++- .../apache/spark/sql/UserDefinedTypeSuite.scala | 18 +- 2 files changed, 316 insertions(+), 262 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/15b58a22/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala index fa1786e..18c96da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala @@ -34,320 +34,366 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi override def toString = sscalaUDF(${children.mkString(,)}) + // scalastyle:off + /** This method has been generated by this script (1 to 22).map { x = val anys = (1 to x).map(x = Any).reduce(_ + , + _) - val evals = (0 to x - 1).map(x = schildren($x).eval(input)).reduce(_ + ,\n + _) + val evals = (0 to x - 1).map(x = s ScalaReflection.convertToScala(children($x).eval(input), children($x).dataType)).reduce(_ + ,\n + _) s case $x = function.asInstanceOf[($anys) = Any]( - $evals) +$evals) -} +}.foreach(println) */ - // scalastyle:off override def eval(input: Row): Any = { val result = children.size match { case 0 = function.asInstanceOf[() = Any]() - case 1 = function.asInstanceOf[(Any) = Any](children(0).eval(input)) + case 1 = +function.asInstanceOf[(Any) = Any]( + ScalaReflection.convertToScala(children(0).eval(input), children(0).dataType)) + + case 2 = function.asInstanceOf[(Any, Any) = Any]( - children(0).eval(input), - children(1).eval(input)) + ScalaReflection.convertToScala(children(0).eval(input), children(0).dataType), + ScalaReflection.convertToScala(children(1).eval(input), children(1).dataType)) + + case 3 = function.asInstanceOf[(Any, Any, Any) = Any]( - children(0).eval(input), - children(1).eval(input), - children(2).eval(input)) + ScalaReflection.convertToScala(children(0).eval(input), children(0).dataType), + ScalaReflection.convertToScala(children(1).eval(input), children(1).dataType), + ScalaReflection.convertToScala(children(2).eval(input), children(2).dataType)) + + case 4 = function.asInstanceOf[(Any, Any, Any, Any) = Any]( - children(0).eval(input), - children(1).eval(input), - children(2).eval(input), - children(3).eval(input)) + ScalaReflection.convertToScala(children(0).eval(input), children(0).dataType), + ScalaReflection.convertToScala(children(1).eval(input), children(1).dataType), + ScalaReflection.convertToScala(children(2).eval(input), children(2).dataType), + ScalaReflection.convertToScala(children(3).eval(input), children(3).dataType)) + + case 5 = function.asInstanceOf[(Any, Any, Any, Any, Any) = Any]( - children(0).eval(input), - children(1).eval(input), - children(2).eval(input), - children(3).eval(input), - children(4).eval(input)) + ScalaReflection.convertToScala(children(0).eval(input), children(0).dataType), + ScalaReflection.convertToScala(children(1).eval(input), children(1).dataType), + ScalaReflection.convertToScala(children(2).eval(input), children(2).dataType), + ScalaReflection.convertToScala(children(3).eval(input), children(3).dataType), + ScalaReflection.convertToScala(children(4).eval(input),
git commit: [SQL] Convert arguments to Scala UDFs
Repository: spark Updated Branches: refs/heads/branch-1.2 fa86d862f - 52db2b942 [SQL] Convert arguments to Scala UDFs Author: Michael Armbrust mich...@databricks.com Closes #3077 from marmbrus/udfsWithUdts and squashes the following commits: 34b5f27 [Michael Armbrust] style 504adef [Michael Armbrust] Convert arguments to Scala UDFs (cherry picked from commit 15b58a2234ab7ba30c9c0cbb536177a3c725e350) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52db2b94 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52db2b94 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52db2b94 Branch: refs/heads/branch-1.2 Commit: 52db2b9429e00d8ed398a2432ad6a26cd1e5920c Parents: fa86d86 Author: Michael Armbrust mich...@databricks.com Authored: Mon Nov 3 18:04:51 2014 -0800 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Nov 3 18:05:02 2014 -0800 -- .../sql/catalyst/expressions/ScalaUdf.scala | 560 ++- .../apache/spark/sql/UserDefinedTypeSuite.scala | 18 +- 2 files changed, 316 insertions(+), 262 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/52db2b94/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala index fa1786e..18c96da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala @@ -34,320 +34,366 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi override def toString = sscalaUDF(${children.mkString(,)}) + // scalastyle:off + /** This method has been generated by this script (1 to 22).map { x = val anys = (1 to x).map(x = Any).reduce(_ + , + _) - val evals = (0 to x - 1).map(x = schildren($x).eval(input)).reduce(_ + ,\n + _) + val evals = (0 to x - 1).map(x = s ScalaReflection.convertToScala(children($x).eval(input), children($x).dataType)).reduce(_ + ,\n + _) s case $x = function.asInstanceOf[($anys) = Any]( - $evals) +$evals) -} +}.foreach(println) */ - // scalastyle:off override def eval(input: Row): Any = { val result = children.size match { case 0 = function.asInstanceOf[() = Any]() - case 1 = function.asInstanceOf[(Any) = Any](children(0).eval(input)) + case 1 = +function.asInstanceOf[(Any) = Any]( + ScalaReflection.convertToScala(children(0).eval(input), children(0).dataType)) + + case 2 = function.asInstanceOf[(Any, Any) = Any]( - children(0).eval(input), - children(1).eval(input)) + ScalaReflection.convertToScala(children(0).eval(input), children(0).dataType), + ScalaReflection.convertToScala(children(1).eval(input), children(1).dataType)) + + case 3 = function.asInstanceOf[(Any, Any, Any) = Any]( - children(0).eval(input), - children(1).eval(input), - children(2).eval(input)) + ScalaReflection.convertToScala(children(0).eval(input), children(0).dataType), + ScalaReflection.convertToScala(children(1).eval(input), children(1).dataType), + ScalaReflection.convertToScala(children(2).eval(input), children(2).dataType)) + + case 4 = function.asInstanceOf[(Any, Any, Any, Any) = Any]( - children(0).eval(input), - children(1).eval(input), - children(2).eval(input), - children(3).eval(input)) + ScalaReflection.convertToScala(children(0).eval(input), children(0).dataType), + ScalaReflection.convertToScala(children(1).eval(input), children(1).dataType), + ScalaReflection.convertToScala(children(2).eval(input), children(2).dataType), + ScalaReflection.convertToScala(children(3).eval(input), children(3).dataType)) + + case 5 = function.asInstanceOf[(Any, Any, Any, Any, Any) = Any]( - children(0).eval(input), - children(1).eval(input), - children(2).eval(input), - children(3).eval(input), - children(4).eval(input)) + ScalaReflection.convertToScala(children(0).eval(input), children(0).dataType), + ScalaReflection.convertToScala(children(1).eval(input), children(1).dataType), + ScalaReflection.convertToScala(children(2).eval(input), children(2).dataType), +
git commit: [SPARK-4168][WebUI] web statges number should show correctly when stages are more than 1000
Repository: spark Updated Branches: refs/heads/master 15b58a223 - 97a466eca [SPARK-4168][WebUI] web statges number should show correctly when stages are more than 1000 The number of completed stages and failed stages showed on webUI will always be less than 1000. This is really misleading when there are already thousands of stages completed or failed. The number should be correct even when only partial stages listed on the webUI (stage info will be removed if the number is too large). Author: Zhang, Liye liye.zh...@intel.com Closes #3035 from liyezhang556520/webStageNum and squashes the following commits: d9e29fb [Zhang, Liye] add detailed comments for variables 4ea8fd1 [Zhang, Liye] change variable name accroding to comments f4c404d [Zhang, Liye] [SPARK-4168][WebUI] web statges number should show correctly when stages are more than 1000 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97a466ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97a466ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97a466ec Branch: refs/heads/master Commit: 97a466eca0a629f17e9662ca2b59eeca99142c54 Parents: 15b58a2 Author: Zhang, Liye liye.zh...@intel.com Authored: Mon Nov 3 18:17:32 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Mon Nov 3 18:17:32 2014 -0800 -- .../org/apache/spark/ui/jobs/JobProgressListener.scala| 9 + .../scala/org/apache/spark/ui/jobs/JobProgressPage.scala | 10 ++ 2 files changed, 15 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/97a466ec/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b520736..e322340 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -59,6 +59,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val failedStages = ListBuffer[StageInfo]() val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData] val stageIdToInfo = new HashMap[StageId, StageInfo] + + // Number of completed and failed stages, may not actually equal to completedStages.size and + // failedStages.size respectively due to completedStage and failedStages only maintain the latest + // part of the stages, the earlier ones will be removed when there are too many stages for + // memory sake. + var numCompletedStages = 0 + var numFailedStages = 0 // Map from pool name to a hash map (map from stage id to StageInfo). val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() @@ -110,9 +117,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { activeStages.remove(stage.stageId) if (stage.failureReason.isEmpty) { completedStages += stage + numCompletedStages += 1 trimIfNecessary(completedStages) } else { failedStages += stage + numFailedStages += 1 trimIfNecessary(failedStages) } } http://git-wip-us.apache.org/repos/asf/spark/blob/97a466ec/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala index 6e718ee..83a7898 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala @@ -34,7 +34,9 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage() listener.synchronized { val activeStages = listener.activeStages.values.toSeq val completedStages = listener.completedStages.reverse.toSeq + val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq + val numFailedStages = listener.numFailedStages val now = System.currentTimeMillis val activeStagesTable = @@ -69,11 +71,11 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage() /li li a href=#completedstrongCompleted Stages:/strong/a - {completedStages.size} + {numCompletedStages} /li li a href=#failedstrongFailed Stages:/strong/a - {failedStages.size} + {numFailedStages} /li /ul /div @@ -86,9 +88,9
git commit: [SPARK-611] Display executor thread dumps in web UI
Repository: spark Updated Branches: refs/heads/master 97a466eca - 4f035dd2c [SPARK-611] Display executor thread dumps in web UI This patch allows executor thread dumps to be collected on-demand and viewed in the Spark web UI. The thread dumps are collected using Thread.getAllStackTraces(). To allow remote thread dumps to be triggered from the web UI, I added a new `ExecutorActor` that runs inside of the Executor actor system and responds to RPCs from the driver. The driver's mechanism for obtaining a reference to this actor is a little bit hacky: it uses the block manager master actor to determine the host/port of the executor actor systems in order to construct ActorRefs to ExecutorActor. Unfortunately, I couldn't find a much cleaner way to do this without a big refactoring of the executor - driver communication. Screenshots: ![image](https://cloud.githubusercontent.com/assets/50748/4781793/7e7a0776-5cbf-11e4-874d-a91cd04620bd.png) ![image](https://cloud.githubusercontent.com/assets/50748/4781794/8bce76aa-5cbf-11e4-8d13-8477748c9f7e.png) ![image](https://cloud.githubusercontent.com/assets/50748/4781797/bd11a8b8-5cbf-11e4-9ad7-a7459467ec8e.png) Author: Josh Rosen joshro...@databricks.com Closes #2944 from JoshRosen/jstack-in-web-ui and squashes the following commits: 3c21a5d [Josh Rosen] Address review comments: 880f7f7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into jstack-in-web-ui f719266 [Josh Rosen] Merge remote-tracking branch 'origin/master' into jstack-in-web-ui 19707b0 [Josh Rosen] Add one comment. 127a130 [Josh Rosen] Update to use SparkContext.DRIVER_IDENTIFIER b8e69aa [Josh Rosen] Merge remote-tracking branch 'origin/master' into jstack-in-web-ui 3dfc2d4 [Josh Rosen] Add missing file. bc1e675 [Josh Rosen] Undo some leftover changes from the earlier approach. f4ac1c1 [Josh Rosen] Switch to on-demand collection of thread dumps dfec08b [Josh Rosen] Add option to disable thread dumps in UI. 4c87d7f [Josh Rosen] Use separate RPC for sending thread dumps. 2b8bdf3 [Josh Rosen] Enable thread dumps from the driver when running in non-local mode. cc3e6b3 [Josh Rosen] Fix test code in DAGSchedulerSuite. 87b8b65 [Josh Rosen] Add new listener event for thread dumps. 8c10216 [Josh Rosen] Add missing file. 0f198ac [Josh Rosen] [SPARK-611] Display executor thread dumps in web UI Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f035dd2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f035dd2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f035dd2 Branch: refs/heads/master Commit: 4f035dd2cd6f1ec9059811f3495f3e0a8ec5fb84 Parents: 97a466e Author: Josh Rosen joshro...@databricks.com Authored: Mon Nov 3 18:18:47 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Mon Nov 3 18:18:47 2014 -0800 -- .../scala/org/apache/spark/SparkContext.scala | 29 +++- .../executor/CoarseGrainedExecutorBackend.scala | 3 +- .../org/apache/spark/executor/Executor.scala| 7 +- .../apache/spark/executor/ExecutorActor.scala | 41 +++ .../spark/storage/BlockManagerMaster.scala | 4 ++ .../spark/storage/BlockManagerMasterActor.scala | 18 + .../spark/storage/BlockManagerMessages.scala| 2 + .../spark/ui/exec/ExecutorThreadDumpPage.scala | 73 .../apache/spark/ui/exec/ExecutorsPage.scala| 15 +++- .../org/apache/spark/ui/exec/ExecutorsTab.scala | 8 ++- .../scala/org/apache/spark/util/AkkaUtils.scala | 14 .../apache/spark/util/ThreadStackTrace.scala| 27 .../scala/org/apache/spark/util/Utils.scala | 13 13 files changed, 247 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8b4db78..40444c2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -21,9 +21,8 @@ import scala.language.implicitConversions import java.io._ import java.net.URI -import java.util.Arrays +import java.util.{Arrays, Properties, UUID} import java.util.concurrent.atomic.AtomicInteger -import java.util.{Properties, UUID} import java.util.UUID.randomUUID import scala.collection.{Map, Set} import scala.collection.generic.Growable @@ -41,6 +40,7 @@ import akka.actor.Props import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import org.apache.spark.executor.TriggerThreadDump import
git commit: [FIX][MLLIB] fix seed in BaggedPointSuite
Repository: spark Updated Branches: refs/heads/master 4f035dd2c - c5912ecc7 [FIX][MLLIB] fix seed in BaggedPointSuite Saw Jenkins test failures due to random seeds. jkbradley manishamde Author: Xiangrui Meng m...@databricks.com Closes #3084 from mengxr/fix-baggedpoint-suite and squashes the following commits: f735a43 [Xiangrui Meng] fix seed in BaggedPointSuite Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5912ecc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5912ecc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5912ecc Branch: refs/heads/master Commit: c5912ecc7b392a13089ae735c07c2d7256de36c6 Parents: 4f035dd Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 18:50:37 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 18:50:37 2014 -0800 -- .../apache/spark/mllib/tree/impl/BaggedPointSuite.scala | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c5912ecc/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala index c0a62e0..5cb4332 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala @@ -30,7 +30,7 @@ class BaggedPointSuite extends FunSuite with LocalSparkContext { test(BaggedPoint RDD: without subsampling) { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) -val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, 1, false) +val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, 1, false, 42) baggedRDD.collect().foreach { baggedPoint = assert(baggedPoint.subsampleWeights.size == 1 baggedPoint.subsampleWeights(0) == 1) } @@ -44,7 +44,7 @@ class BaggedPointSuite extends FunSuite with LocalSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed = - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, true) + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, true, seed) val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) @@ -60,7 +60,7 @@ class BaggedPointSuite extends FunSuite with LocalSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed = - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, true) + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, true, seed) val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) @@ -75,7 +75,7 @@ class BaggedPointSuite extends FunSuite with LocalSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed = - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, false) + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, false, seed) val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) @@ -91,7 +91,7 @@ class BaggedPointSuite extends FunSuite with LocalSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed = - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, false) + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, false, seed) val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail:
git commit: [FIX][MLLIB] fix seed in BaggedPointSuite
Repository: spark Updated Branches: refs/heads/branch-1.2 52db2b942 - 0826eed9c [FIX][MLLIB] fix seed in BaggedPointSuite Saw Jenkins test failures due to random seeds. jkbradley manishamde Author: Xiangrui Meng m...@databricks.com Closes #3084 from mengxr/fix-baggedpoint-suite and squashes the following commits: f735a43 [Xiangrui Meng] fix seed in BaggedPointSuite (cherry picked from commit c5912ecc7b392a13089ae735c07c2d7256de36c6) Signed-off-by: Xiangrui Meng m...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0826eed9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0826eed9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0826eed9 Branch: refs/heads/branch-1.2 Commit: 0826eed9c84a73544e3d8289834c8b5ebac47e03 Parents: 52db2b9 Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 18:50:37 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 18:50:51 2014 -0800 -- .../apache/spark/mllib/tree/impl/BaggedPointSuite.scala | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0826eed9/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala index c0a62e0..5cb4332 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala @@ -30,7 +30,7 @@ class BaggedPointSuite extends FunSuite with LocalSparkContext { test(BaggedPoint RDD: without subsampling) { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) -val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, 1, false) +val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, 1, false, 42) baggedRDD.collect().foreach { baggedPoint = assert(baggedPoint.subsampleWeights.size == 1 baggedPoint.subsampleWeights(0) == 1) } @@ -44,7 +44,7 @@ class BaggedPointSuite extends FunSuite with LocalSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed = - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, true) + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, true, seed) val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) @@ -60,7 +60,7 @@ class BaggedPointSuite extends FunSuite with LocalSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed = - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, true) + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, true, seed) val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) @@ -75,7 +75,7 @@ class BaggedPointSuite extends FunSuite with LocalSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed = - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, false) + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, false, seed) val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) @@ -91,7 +91,7 @@ class BaggedPointSuite extends FunSuite with LocalSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed = - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, false) + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, false, seed) val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01)
git commit: [SPARK-4192][SQL] Internal API for Python UDT
Repository: spark Updated Branches: refs/heads/master c5912ecc7 - 04450d115 [SPARK-4192][SQL] Internal API for Python UDT Following #2919, this PR adds Python UDT (for internal use only) with tests under pyspark.tests. Before `SQLContext.applySchema`, we check whether we need to convert user-type instances into SQL recognizable data. In the current implementation, a Python UDT must be paired with a Scala UDT for serialization on the JVM side. A following PR will add VectorUDT in MLlib for both Scala and Python. marmbrus jkbradley davies Author: Xiangrui Meng m...@databricks.com Closes #3068 from mengxr/SPARK-4192-sql and squashes the following commits: acff637 [Xiangrui Meng] merge master dba5ea7 [Xiangrui Meng] only use pyClass for Python UDT output sqlType as well 2c9d7e4 [Xiangrui Meng] move import to global setup; update needsConversion 7c4a6a9 [Xiangrui Meng] address comments 75223db [Xiangrui Meng] minor update f740379 [Xiangrui Meng] remove UDT from default imports e98d9d0 [Xiangrui Meng] fix py style 4e84fce [Xiangrui Meng] remove local hive tests and add more tests 39f19e0 [Xiangrui Meng] add tests b7f666d [Xiangrui Meng] add Python UDT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04450d11 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04450d11 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04450d11 Branch: refs/heads/master Commit: 04450d11548cfb25d4fb77d4a33e3a7cd4254183 Parents: c5912ec Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 19:29:11 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 19:29:11 2014 -0800 -- python/pyspark/sql.py | 206 ++- python/pyspark/tests.py | 93 - .../spark/sql/catalyst/types/dataTypes.scala| 9 +- .../scala/org/apache/spark/sql/SQLContext.scala | 2 + .../apache/spark/sql/execution/pythonUdfs.scala | 5 + .../apache/spark/sql/test/ExamplePointUDT.scala | 64 ++ .../sql/types/util/DataTypeConversions.scala| 1 - 7 files changed, 375 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/04450d11/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 675df08..d16c18b 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -417,6 +417,75 @@ class StructType(DataType): return StructType([StructField.fromJson(f) for f in json[fields]]) +class UserDefinedType(DataType): + +:: WARN: Spark Internal Use Only :: +SQL User-Defined Type (UDT). + + +@classmethod +def typeName(cls): +return cls.__name__.lower() + +@classmethod +def sqlType(cls): + +Underlying SQL storage type for this UDT. + +raise NotImplementedError(UDT must implement sqlType().) + +@classmethod +def module(cls): + +The Python module of the UDT. + +raise NotImplementedError(UDT must implement module().) + +@classmethod +def scalaUDT(cls): + +The class name of the paired Scala UDT. + +raise NotImplementedError(UDT must have a paired Scala UDT.) + +def serialize(self, obj): + +Converts the a user-type object into a SQL datum. + +raise NotImplementedError(UDT must implement serialize().) + +def deserialize(self, datum): + +Converts a SQL datum into a user-type object. + +raise NotImplementedError(UDT must implement deserialize().) + +def json(self): +return json.dumps(self.jsonValue(), separators=(',', ':'), sort_keys=True) + +def jsonValue(self): +schema = { +type: udt, +class: self.scalaUDT(), +pyClass: %s.%s % (self.module(), type(self).__name__), +sqlType: self.sqlType().jsonValue() +} +return schema + +@classmethod +def fromJson(cls, json): +pyUDT = json[pyClass] +split = pyUDT.rfind(.) +pyModule = pyUDT[:split] +pyClass = pyUDT[split+1:] +m = __import__(pyModule, globals(), locals(), [pyClass], -1) +UDT = getattr(m, pyClass) +return UDT() + +def __eq__(self, other): +return type(self) == type(other) + + _all_primitive_types = dict((v.typeName(), v) for v in globals().itervalues() if type(v) is PrimitiveTypeSingleton and @@ -469,6 +538,12 @@ def _parse_datatype_json_string(json_string): ... complex_arraytype, False) check_datatype(complex_maptype) True + check_datatype(ExamplePointUDT()) +
git commit: [SPARK-4192][SQL] Internal API for Python UDT
Repository: spark Updated Branches: refs/heads/branch-1.2 0826eed9c - 42d02db86 [SPARK-4192][SQL] Internal API for Python UDT Following #2919, this PR adds Python UDT (for internal use only) with tests under pyspark.tests. Before `SQLContext.applySchema`, we check whether we need to convert user-type instances into SQL recognizable data. In the current implementation, a Python UDT must be paired with a Scala UDT for serialization on the JVM side. A following PR will add VectorUDT in MLlib for both Scala and Python. marmbrus jkbradley davies Author: Xiangrui Meng m...@databricks.com Closes #3068 from mengxr/SPARK-4192-sql and squashes the following commits: acff637 [Xiangrui Meng] merge master dba5ea7 [Xiangrui Meng] only use pyClass for Python UDT output sqlType as well 2c9d7e4 [Xiangrui Meng] move import to global setup; update needsConversion 7c4a6a9 [Xiangrui Meng] address comments 75223db [Xiangrui Meng] minor update f740379 [Xiangrui Meng] remove UDT from default imports e98d9d0 [Xiangrui Meng] fix py style 4e84fce [Xiangrui Meng] remove local hive tests and add more tests 39f19e0 [Xiangrui Meng] add tests b7f666d [Xiangrui Meng] add Python UDT (cherry picked from commit 04450d11548cfb25d4fb77d4a33e3a7cd4254183) Signed-off-by: Xiangrui Meng m...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42d02db8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42d02db8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42d02db8 Branch: refs/heads/branch-1.2 Commit: 42d02db86cd973cf31ceeede0c5a723238bbe746 Parents: 0826eed Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 19:29:11 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 19:30:32 2014 -0800 -- python/pyspark/sql.py | 206 ++- python/pyspark/tests.py | 93 - .../spark/sql/catalyst/types/dataTypes.scala| 9 +- .../scala/org/apache/spark/sql/SQLContext.scala | 2 + .../apache/spark/sql/execution/pythonUdfs.scala | 5 + .../apache/spark/sql/test/ExamplePointUDT.scala | 64 ++ .../sql/types/util/DataTypeConversions.scala| 1 - 7 files changed, 375 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42d02db8/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 675df08..d16c18b 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -417,6 +417,75 @@ class StructType(DataType): return StructType([StructField.fromJson(f) for f in json[fields]]) +class UserDefinedType(DataType): + +:: WARN: Spark Internal Use Only :: +SQL User-Defined Type (UDT). + + +@classmethod +def typeName(cls): +return cls.__name__.lower() + +@classmethod +def sqlType(cls): + +Underlying SQL storage type for this UDT. + +raise NotImplementedError(UDT must implement sqlType().) + +@classmethod +def module(cls): + +The Python module of the UDT. + +raise NotImplementedError(UDT must implement module().) + +@classmethod +def scalaUDT(cls): + +The class name of the paired Scala UDT. + +raise NotImplementedError(UDT must have a paired Scala UDT.) + +def serialize(self, obj): + +Converts the a user-type object into a SQL datum. + +raise NotImplementedError(UDT must implement serialize().) + +def deserialize(self, datum): + +Converts a SQL datum into a user-type object. + +raise NotImplementedError(UDT must implement deserialize().) + +def json(self): +return json.dumps(self.jsonValue(), separators=(',', ':'), sort_keys=True) + +def jsonValue(self): +schema = { +type: udt, +class: self.scalaUDT(), +pyClass: %s.%s % (self.module(), type(self).__name__), +sqlType: self.sqlType().jsonValue() +} +return schema + +@classmethod +def fromJson(cls, json): +pyUDT = json[pyClass] +split = pyUDT.rfind(.) +pyModule = pyUDT[:split] +pyClass = pyUDT[split+1:] +m = __import__(pyModule, globals(), locals(), [pyClass], -1) +UDT = getattr(m, pyClass) +return UDT() + +def __eq__(self, other): +return type(self) == type(other) + + _all_primitive_types = dict((v.typeName(), v) for v in globals().itervalues() if type(v) is PrimitiveTypeSingleton and @@ -469,6 +538,12 @@ def _parse_datatype_json_string(json_string): ...
git commit: [SPARK-3573][MLLIB] Make MLlib's Vector compatible with SQL's SchemaRDD
Repository: spark Updated Branches: refs/heads/master 04450d115 - 1a9c6cdda [SPARK-3573][MLLIB] Make MLlib's Vector compatible with SQL's SchemaRDD Register MLlib's Vector as a SQL user-defined type (UDT) in both Scala and Python. With this PR, we can easily map a RDD[LabeledPoint] to a SchemaRDD, and then select columns or save to a Parquet file. Examples in Scala/Python are attached. The Scala code was copied from jkbradley. ~~This PR contains the changes from #3068 . I will rebase after #3068 is merged.~~ marmbrus jkbradley Author: Xiangrui Meng m...@databricks.com Closes #3070 from mengxr/SPARK-3573 and squashes the following commits: 3a0b6e5 [Xiangrui Meng] organize imports 236f0a0 [Xiangrui Meng] register vector as UDT and provide dataset examples Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a9c6cdd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a9c6cdd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a9c6cdd Branch: refs/heads/master Commit: 1a9c6cddadebdc53d083ac3e0da276ce979b5d1f Parents: 04450d1 Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 22:29:48 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 22:29:48 2014 -0800 -- dev/run-tests | 2 +- .../src/main/python/mllib/dataset_example.py| 62 ++ .../spark/examples/mllib/DatasetExample.scala | 121 +++ mllib/pom.xml | 5 + .../org/apache/spark/mllib/linalg/Vectors.scala | 69 ++- .../spark/mllib/linalg/VectorsSuite.scala | 11 ++ python/pyspark/mllib/linalg.py | 50 python/pyspark/mllib/tests.py | 39 +- 8 files changed, 353 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1a9c6cdd/dev/run-tests -- diff --git a/dev/run-tests b/dev/run-tests index 0e9eefa..de607e4 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -180,7 +180,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS if [ -n $_SQL_TESTS_ONLY ]; then # This must be an array of individual arguments. Otherwise, having one long string #+ will be interpreted as a single test, which doesn't work. -SBT_MAVEN_TEST_ARGS=(catalyst/test sql/test hive/test) +SBT_MAVEN_TEST_ARGS=(catalyst/test sql/test hive/test mllib/test) else SBT_MAVEN_TEST_ARGS=(test) fi http://git-wip-us.apache.org/repos/asf/spark/blob/1a9c6cdd/examples/src/main/python/mllib/dataset_example.py -- diff --git a/examples/src/main/python/mllib/dataset_example.py b/examples/src/main/python/mllib/dataset_example.py new file mode 100644 index 000..540dae7 --- /dev/null +++ b/examples/src/main/python/mllib/dataset_example.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +An example of how to use SchemaRDD as a dataset for ML. Run with:: +bin/spark-submit examples/src/main/python/mllib/dataset_example.py + + +import os +import sys +import tempfile +import shutil + +from pyspark import SparkContext +from pyspark.sql import SQLContext +from pyspark.mllib.util import MLUtils +from pyspark.mllib.stat import Statistics + + +def summarize(dataset): +print schema: %s % dataset.schema().json() +labels = dataset.map(lambda r: r.label) +print label average: %f % labels.mean() +features = dataset.map(lambda r: r.features) +summary = Statistics.colStats(features) +print features average: %r % summary.mean() + +if __name__ == __main__: +if len(sys.argv) 2: +print sys.stderr, Usage: dataset_example.py libsvm file +exit(-1) +sc = SparkContext(appName=DatasetExample) +sqlCtx = SQLContext(sc) +if len(sys.argv) == 2: +input = sys.argv[1] +else: +input = data/mllib/sample_libsvm_data.txt +points = MLUtils.loadLibSVMFile(sc, input) +dataset0 =
git commit: [SPARK-3573][MLLIB] Make MLlib's Vector compatible with SQL's SchemaRDD
Repository: spark Updated Branches: refs/heads/branch-1.2 42d02db86 - 8395e8fbd [SPARK-3573][MLLIB] Make MLlib's Vector compatible with SQL's SchemaRDD Register MLlib's Vector as a SQL user-defined type (UDT) in both Scala and Python. With this PR, we can easily map a RDD[LabeledPoint] to a SchemaRDD, and then select columns or save to a Parquet file. Examples in Scala/Python are attached. The Scala code was copied from jkbradley. ~~This PR contains the changes from #3068 . I will rebase after #3068 is merged.~~ marmbrus jkbradley Author: Xiangrui Meng m...@databricks.com Closes #3070 from mengxr/SPARK-3573 and squashes the following commits: 3a0b6e5 [Xiangrui Meng] organize imports 236f0a0 [Xiangrui Meng] register vector as UDT and provide dataset examples (cherry picked from commit 1a9c6cddadebdc53d083ac3e0da276ce979b5d1f) Signed-off-by: Xiangrui Meng m...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8395e8fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8395e8fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8395e8fb Branch: refs/heads/branch-1.2 Commit: 8395e8fbdf23bef286ec68a4bbadcc448b504c2c Parents: 42d02db Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 22:29:48 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 22:31:43 2014 -0800 -- dev/run-tests | 2 +- .../src/main/python/mllib/dataset_example.py| 62 ++ .../spark/examples/mllib/DatasetExample.scala | 121 +++ mllib/pom.xml | 5 + .../org/apache/spark/mllib/linalg/Vectors.scala | 69 ++- .../spark/mllib/linalg/VectorsSuite.scala | 11 ++ python/pyspark/mllib/linalg.py | 50 python/pyspark/mllib/tests.py | 39 +- 8 files changed, 353 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8395e8fb/dev/run-tests -- diff --git a/dev/run-tests b/dev/run-tests index 0e9eefa..de607e4 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -180,7 +180,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS if [ -n $_SQL_TESTS_ONLY ]; then # This must be an array of individual arguments. Otherwise, having one long string #+ will be interpreted as a single test, which doesn't work. -SBT_MAVEN_TEST_ARGS=(catalyst/test sql/test hive/test) +SBT_MAVEN_TEST_ARGS=(catalyst/test sql/test hive/test mllib/test) else SBT_MAVEN_TEST_ARGS=(test) fi http://git-wip-us.apache.org/repos/asf/spark/blob/8395e8fb/examples/src/main/python/mllib/dataset_example.py -- diff --git a/examples/src/main/python/mllib/dataset_example.py b/examples/src/main/python/mllib/dataset_example.py new file mode 100644 index 000..540dae7 --- /dev/null +++ b/examples/src/main/python/mllib/dataset_example.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +An example of how to use SchemaRDD as a dataset for ML. Run with:: +bin/spark-submit examples/src/main/python/mllib/dataset_example.py + + +import os +import sys +import tempfile +import shutil + +from pyspark import SparkContext +from pyspark.sql import SQLContext +from pyspark.mllib.util import MLUtils +from pyspark.mllib.stat import Statistics + + +def summarize(dataset): +print schema: %s % dataset.schema().json() +labels = dataset.map(lambda r: r.label) +print label average: %f % labels.mean() +features = dataset.map(lambda r: r.features) +summary = Statistics.colStats(features) +print features average: %r % summary.mean() + +if __name__ == __main__: +if len(sys.argv) 2: +print sys.stderr, Usage: dataset_example.py libsvm file +exit(-1) +sc = SparkContext(appName=DatasetExample) +sqlCtx = SQLContext(sc) +if len(sys.argv) == 2: +input = sys.argv[1] +else: +
git commit: [SPARK-4163][Core] Add a backward compatibility test for FetchFailed
Repository: spark Updated Branches: refs/heads/master 1a9c6cdda - 9bdc8412a [SPARK-4163][Core] Add a backward compatibility test for FetchFailed /cc aarondav Author: zsxwing zsxw...@gmail.com Closes #3086 from zsxwing/SPARK-4163-back-comp and squashes the following commits: 21cb2a8 [zsxwing] Add a backward compatibility test for FetchFailed Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bdc8412 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bdc8412 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bdc8412 Branch: refs/heads/master Commit: 9bdc8412a0160e06e8182bd8b2f9bb65b478c590 Parents: 1a9c6cd Author: zsxwing zsxw...@gmail.com Authored: Mon Nov 3 22:40:43 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Mon Nov 3 22:40:43 2014 -0800 -- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 11 +++ 1 file changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9bdc8412/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a91c9dd..0103012 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -177,6 +177,17 @@ class JsonProtocolSuite extends FunSuite { deserializedBmRemoved) } + test(FetchFailed backwards compatibility) { +// FetchFailed in Spark 1.1.0 does not have an Message property. +val fetchFailed = FetchFailed(BlockManagerId(With or, without you, 15), 17, 18, 19, + ignored) +val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed) + .removeField({ _._1 == Message }) +val expectedFetchFailed = FetchFailed(BlockManagerId(With or, without you, 15), 17, 18, 19, + Unknown reason) +assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent)) + } + test(SparkListenerApplicationStart backwards compatibility) { // SparkListenerApplicationStart in Spark 1.0.0 do not have an appId property. val applicationStart = SparkListenerApplicationStart(test, None, 1L, user) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4166][Core] Add a backward compatibility test for ExecutorLostFailure
Repository: spark Updated Branches: refs/heads/master 9bdc8412a - b671ce047 [SPARK-4166][Core] Add a backward compatibility test for ExecutorLostFailure Author: zsxwing zsxw...@gmail.com Closes #3085 from zsxwing/SPARK-4166-back-comp and squashes the following commits: 89329f4 [zsxwing] Add a backward compatibility test for ExecutorLostFailure Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b671ce04 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b671ce04 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b671ce04 Branch: refs/heads/master Commit: b671ce047d036b8923007902826038b01e836e8a Parents: 9bdc841 Author: zsxwing zsxw...@gmail.com Authored: Mon Nov 3 22:47:45 2014 -0800 Committer: Andrew Or and...@databricks.com Committed: Mon Nov 3 22:47:45 2014 -0800 -- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b671ce04/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 0103012..aec1e40 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -196,6 +196,15 @@ class JsonProtocolSuite extends FunSuite { assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent)) } + test(ExecutorLostFailure backward compatibility) { +// ExecutorLostFailure in Spark 1.1.0 does not have an Executor ID property. +val executorLostFailure = ExecutorLostFailure(100) +val oldEvent = JsonProtocol.taskEndReasonToJson(executorLostFailure) + .removeField({ _._1 == Executor ID }) +val expectedExecutorLostFailure = ExecutorLostFailure(Unknown) +assert(expectedExecutorLostFailure === JsonProtocol.taskEndReasonFromJson(oldEvent)) + } + /** -- * | Helper test running methods | * --- */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
Repository: spark Updated Branches: refs/heads/master b671ce047 - e4f42631a [SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default. This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1. Author: Davies Liu dav...@databricks.com This patch had conflicts when merged, resolved by Committer: Josh Rosen joshro...@databricks.com Closes #2920 from davies/fix_autobatch and squashes the following commits: e544ef9 [Davies Liu] revert unrelated change 6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 1d557fc [Davies Liu] fix tests 8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 76abdce [Davies Liu] clean up 53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch b4292ce [Davies Liu] fix bug in master d79744c [Davies Liu] recover hive tests be37ece [Davies Liu] refactor eb3938d [Davies Liu] refactor serializer in scala 8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4f42631 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4f42631 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4f42631 Branch: refs/heads/master Commit: e4f42631a68b473ce706429915f3f08042af2119 Parents: b671ce0 Author: Davies Liu dav...@databricks.com Authored: Mon Nov 3 23:56:14 2014 -0800 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Nov 3 23:56:14 2014 -0800 -- .../spark/api/python/PythonHadoopUtil.scala | 6 +- .../org/apache/spark/api/python/PythonRDD.scala | 110 + .../org/apache/spark/api/python/SerDeUtil.scala | 121 ++- .../WriteInputFormatTestDataGenerator.scala | 10 +- .../spark/mllib/api/python/PythonMLLibAPI.scala | 2 +- python/pyspark/context.py | 58 +++-- python/pyspark/mllib/common.py | 2 +- python/pyspark/mllib/recommendation.py | 2 +- python/pyspark/rdd.py | 91 ++ python/pyspark/serializers.py | 36 ++ python/pyspark/shuffle.py | 7 +- python/pyspark/sql.py | 18 ++- python/pyspark/tests.py | 66 ++ .../scala/org/apache/spark/sql/SchemaRDD.scala | 10 +- 14 files changed, 201 insertions(+), 338 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index 49dc95f..5ba6617 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -61,8 +61,7 @@ private[python] object Converter extends Logging { * Other objects are passed through without conversion. */ private[python] class WritableToJavaConverter( -conf: Broadcast[SerializableWritable[Configuration]], -batchSize: Int) extends Converter[Any, Any] { +conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] { /** * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or @@ -94,8 +93,7 @@ private[python] class WritableToJavaConverter( map.put(convertWritable(k), convertWritable(v)) } map - case w: Writable = -if (batchSize 1) WritableUtils.clone(w, conf.value.value) else w + case w: Writable = WritableUtils.clone(w, conf.value.value) case other = other } } http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 61b125e..e94ccdc 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -22,12 +22,10 @@ import java.net._ import java.util.{List = JList, ArrayList = JArrayList, Map = JMap, Collections} import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ import
git commit: [SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
Repository: spark Updated Branches: refs/heads/branch-1.2 8395e8fbd - 786e75b33 [SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default. This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1. Author: Davies Liu dav...@databricks.com This patch had conflicts when merged, resolved by Committer: Josh Rosen joshro...@databricks.com Closes #2920 from davies/fix_autobatch and squashes the following commits: e544ef9 [Davies Liu] revert unrelated change 6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 1d557fc [Davies Liu] fix tests 8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 76abdce [Davies Liu] clean up 53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch b4292ce [Davies Liu] fix bug in master d79744c [Davies Liu] recover hive tests be37ece [Davies Liu] refactor eb3938d [Davies Liu] refactor serializer in scala 8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default. (cherry picked from commit e4f42631a68b473ce706429915f3f08042af2119) Signed-off-by: Josh Rosen joshro...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/786e75b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/786e75b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/786e75b3 Branch: refs/heads/branch-1.2 Commit: 786e75b33f0bc1445bfc289fe4b62407cb79026e Parents: 8395e8f Author: Davies Liu dav...@databricks.com Authored: Mon Nov 3 23:56:14 2014 -0800 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Nov 3 23:56:37 2014 -0800 -- .../spark/api/python/PythonHadoopUtil.scala | 6 +- .../org/apache/spark/api/python/PythonRDD.scala | 110 + .../org/apache/spark/api/python/SerDeUtil.scala | 121 ++- .../WriteInputFormatTestDataGenerator.scala | 10 +- .../spark/mllib/api/python/PythonMLLibAPI.scala | 2 +- python/pyspark/context.py | 58 +++-- python/pyspark/mllib/common.py | 2 +- python/pyspark/mllib/recommendation.py | 2 +- python/pyspark/rdd.py | 91 ++ python/pyspark/serializers.py | 36 ++ python/pyspark/shuffle.py | 7 +- python/pyspark/sql.py | 18 ++- python/pyspark/tests.py | 66 ++ .../scala/org/apache/spark/sql/SchemaRDD.scala | 10 +- 14 files changed, 201 insertions(+), 338 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/786e75b3/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index 49dc95f..5ba6617 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -61,8 +61,7 @@ private[python] object Converter extends Logging { * Other objects are passed through without conversion. */ private[python] class WritableToJavaConverter( -conf: Broadcast[SerializableWritable[Configuration]], -batchSize: Int) extends Converter[Any, Any] { +conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] { /** * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or @@ -94,8 +93,7 @@ private[python] class WritableToJavaConverter( map.put(convertWritable(k), convertWritable(v)) } map - case w: Writable = -if (batchSize 1) WritableUtils.clone(w, conf.value.value) else w + case w: Writable = WritableUtils.clone(w, conf.value.value) case other = other } } http://git-wip-us.apache.org/repos/asf/spark/blob/786e75b3/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 61b125e..e94ccdc 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -22,12 +22,10 @@ import java.net._ import java.util.{List = JList, ArrayList =