Git Push Summary

2014-11-03 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 [created] 76386e1a2

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



Git Push Summary

2014-11-03 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.1.0-rc4 [deleted] 5918ea4c9

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [EC2] Factor out Mesos spark-ec2 branch

2014-11-03 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 76386e1a2 - 2aca97c7c


[EC2] Factor out Mesos spark-ec2 branch

We reference a specific branch in two places. This patch makes it one place.

Author: Nicholas Chammas nicholas.cham...@gmail.com

Closes #3008 from nchammas/mesos-spark-ec2-branch and squashes the following 
commits:

10a6089 [Nicholas Chammas] factor out mess spark-ec2 branch


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2aca97c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2aca97c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2aca97c7

Branch: refs/heads/master
Commit: 2aca97c7cfdefea8b6f9dbb88951e9acdfd606d9
Parents: 76386e1
Author: Nicholas Chammas nicholas.cham...@gmail.com
Authored: Mon Nov 3 09:02:35 2014 -0800
Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu
Committed: Mon Nov 3 09:02:35 2014 -0800

--
 ec2/spark_ec2.py | 11 +--
 1 file changed, 9 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2aca97c7/ec2/spark_ec2.py
--
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 0d6b82b..50f88f7 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -41,8 +41,9 @@ from boto import ec2
 
 DEFAULT_SPARK_VERSION = 1.1.0
 
+MESOS_SPARK_EC2_BRANCH = v4
 # A URL prefix from which to fetch AMI information
-AMI_PREFIX = https://raw.github.com/mesos/spark-ec2/v2/ami-list;
+AMI_PREFIX = 
https://raw.github.com/mesos/spark-ec2/{b}/ami-list.format(b=MESOS_SPARK_EC2_BRANCH)
 
 
 class UsageError(Exception):
@@ -583,7 +584,13 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, 
deploy_ssh_key):
 
 # NOTE: We should clone the repository before running deploy_files to
 # prevent ec2-variables.sh from being overwritten
-ssh(master, opts, rm -rf spark-ec2  git clone 
https://github.com/mesos/spark-ec2.git -b v4)
+ssh(
+host=master,
+opts=opts,
+command=rm -rf spark-ec2
++   
++ git clone https://github.com/mesos/spark-ec2.git -b 
{b}.format(b=MESOS_SPARK_EC2_BRANCH)
+)
 
 print Deploying files to master...
 deploy_files(conn, deploy.generic, opts, master_nodes, slave_nodes, 
modules)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 2aca97c7c - 3cca19622


[SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample

The current way of seed distribution makes the random sequences from partition 
i and i+1 offset by 1.

~~~
In [14]: import random

In [15]: r1 = random.Random(10)

In [16]: r1.randint(0, 1)
Out[16]: 1

In [17]: r1.random()
Out[17]: 0.4288890546751146

In [18]: r1.random()
Out[18]: 0.5780913011344704

In [19]: r2 = random.Random(10)

In [20]: r2.randint(0, 1)
Out[20]: 1

In [21]: r2.randint(0, 1)
Out[21]: 0

In [22]: r2.random()
Out[22]: 0.5780913011344704
~~~

Note: The new tests are not for this bug fix.

Author: Xiangrui Meng m...@databricks.com

Closes #3010 from mengxr/SPARK-4148 and squashes the following commits:

869ae4b [Xiangrui Meng] move tests tests.py
c1bacd9 [Xiangrui Meng] fix seed distribution and add some tests for rdd.sample


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cca1962
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cca1962
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cca1962

Branch: refs/heads/master
Commit: 3cca1962207745814b9d83e791713c91b659c36c
Parents: 2aca97c
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 12:24:24 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 12:24:24 2014 -0800

--
 python/pyspark/rdd.py|  3 ---
 python/pyspark/rddsampler.py | 11 +--
 python/pyspark/tests.py  | 15 +++
 3 files changed, 20 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 550c9dd..4f025b9 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -316,9 +316,6 @@ class RDD(object):
 
 Return a sampled subset of this RDD (relies on numpy and falls back
 on default random generator if numpy is unavailable).
-
- sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() 
#doctest: +SKIP
-[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
 
 assert fraction = 0.0, Negative fraction value: %s % fraction
 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, 
fraction, seed).func, True)

http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/rddsampler.py
--
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 528a181..f5c3cfd 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -40,14 +40,13 @@ class RDDSamplerBase(object):
 def initRandomGenerator(self, split):
 if self._use_numpy:
 import numpy
-self._random = numpy.random.RandomState(self._seed)
+self._random = numpy.random.RandomState(self._seed ^ split)
 else:
-self._random = random.Random(self._seed)
+self._random = random.Random(self._seed ^ split)
 
-for _ in range(0, split):
-# discard the next few values in the sequence to have a
-# different seed for the different splits
-self._random.randint(0, 2 ** 32 - 1)
+# mixing because the initial seeds are close to each other
+for _ in xrange(10):
+self._random.randint(0, 1)
 
 self._split = split
 self._rand_initialized = True

http://git-wip-us.apache.org/repos/asf/spark/blob/3cca1962/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 37a1289..253a471 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -648,6 +648,21 @@ class RDDTests(ReusedPySparkTestCase):
 self.assertEquals(result.getNumPartitions(), 5)
 self.assertEquals(result.count(), 3)
 
+def test_sample(self):
+rdd = self.sc.parallelize(range(0, 100), 4)
+wo = rdd.sample(False, 0.1, 2).collect()
+wo_dup = rdd.sample(False, 0.1, 2).collect()
+self.assertSetEqual(set(wo), set(wo_dup))
+wr = rdd.sample(True, 0.2, 5).collect()
+wr_dup = rdd.sample(True, 0.2, 5).collect()
+self.assertSetEqual(set(wr), set(wr_dup))
+wo_s10 = rdd.sample(False, 0.3, 10).collect()
+wo_s20 = rdd.sample(False, 0.3, 20).collect()
+self.assertNotEqual(set(wo_s10), set(wo_s20))
+wr_s11 = rdd.sample(True, 0.4, 11).collect()
+wr_s21 = rdd.sample(True, 0.4, 21).collect()
+self.assertNotEqual(set(wr_s11), set(wr_s21))
+
 
 class ProfilerTests(PySparkTestCase):
 



git commit: [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 76386e1a2 - a68321400


[SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample

The current way of seed distribution makes the random sequences from partition 
i and i+1 offset by 1.

~~~
In [14]: import random

In [15]: r1 = random.Random(10)

In [16]: r1.randint(0, 1)
Out[16]: 1

In [17]: r1.random()
Out[17]: 0.4288890546751146

In [18]: r1.random()
Out[18]: 0.5780913011344704

In [19]: r2 = random.Random(10)

In [20]: r2.randint(0, 1)
Out[20]: 1

In [21]: r2.randint(0, 1)
Out[21]: 0

In [22]: r2.random()
Out[22]: 0.5780913011344704
~~~

Note: The new tests are not for this bug fix.

Author: Xiangrui Meng m...@databricks.com

Closes #3010 from mengxr/SPARK-4148 and squashes the following commits:

869ae4b [Xiangrui Meng] move tests tests.py
c1bacd9 [Xiangrui Meng] fix seed distribution and add some tests for rdd.sample

(cherry picked from commit 3cca1962207745814b9d83e791713c91b659c36c)
Signed-off-by: Xiangrui Meng m...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6832140
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6832140
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6832140

Branch: refs/heads/branch-1.2
Commit: a68321400c1068449698d03cebd0fbf648627133
Parents: 76386e1
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 12:24:24 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 12:24:47 2014 -0800

--
 python/pyspark/rdd.py|  3 ---
 python/pyspark/rddsampler.py | 11 +--
 python/pyspark/tests.py  | 15 +++
 3 files changed, 20 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 550c9dd..4f025b9 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -316,9 +316,6 @@ class RDD(object):
 
 Return a sampled subset of this RDD (relies on numpy and falls back
 on default random generator if numpy is unavailable).
-
- sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() 
#doctest: +SKIP
-[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
 
 assert fraction = 0.0, Negative fraction value: %s % fraction
 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, 
fraction, seed).func, True)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/rddsampler.py
--
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 528a181..f5c3cfd 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -40,14 +40,13 @@ class RDDSamplerBase(object):
 def initRandomGenerator(self, split):
 if self._use_numpy:
 import numpy
-self._random = numpy.random.RandomState(self._seed)
+self._random = numpy.random.RandomState(self._seed ^ split)
 else:
-self._random = random.Random(self._seed)
+self._random = random.Random(self._seed ^ split)
 
-for _ in range(0, split):
-# discard the next few values in the sequence to have a
-# different seed for the different splits
-self._random.randint(0, 2 ** 32 - 1)
+# mixing because the initial seeds are close to each other
+for _ in xrange(10):
+self._random.randint(0, 1)
 
 self._split = split
 self._rand_initialized = True

http://git-wip-us.apache.org/repos/asf/spark/blob/a6832140/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 37a1289..253a471 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -648,6 +648,21 @@ class RDDTests(ReusedPySparkTestCase):
 self.assertEquals(result.getNumPartitions(), 5)
 self.assertEquals(result.count(), 3)
 
+def test_sample(self):
+rdd = self.sc.parallelize(range(0, 100), 4)
+wo = rdd.sample(False, 0.1, 2).collect()
+wo_dup = rdd.sample(False, 0.1, 2).collect()
+self.assertSetEqual(set(wo), set(wo_dup))
+wr = rdd.sample(True, 0.2, 5).collect()
+wr_dup = rdd.sample(True, 0.2, 5).collect()
+self.assertSetEqual(set(wr), set(wr_dup))
+wo_s10 = rdd.sample(False, 0.3, 10).collect()
+wo_s20 = rdd.sample(False, 0.3, 20).collect()
+self.assertNotEqual(set(wo_s10), set(wo_s20))
+wr_s11 = rdd.sample(True, 0.4, 11).collect()
+wr_s21 = rdd.sample(True, 0.4, 

git commit: [SPARK-4211][Build] Fixes hive.version in Maven profile hive-0.13.1

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 a68321400 - fc782896b


[SPARK-4211][Build] Fixes hive.version in Maven profile hive-0.13.1

instead of `hive.version=0.13.1`.
e.g. mvn -Phive -Phive=0.13.1

Note: `hive.version=0.13.1a` is the default property value. However, when 
explicitly specifying the `hive-0.13.1` maven profile, the wrong one would be 
selected.
References:  PR #2685, which resolved a package incompatibility issue with 
Hive-0.13.1 by introducing a special version Hive-0.13.1a

Author: fi code...@gmail.com

Closes #3072 from coderfi/master and squashes the following commits:

7ca4b1e [fi] Fixes the `hive-0.13.1` maven profile referencing 
`hive.version=0.13.1` instead of the Spark compatible `hive.version=0.13.1a` 
Note: `hive.version=0.13.1a` is the default version. However, when explicitly 
specifying the `hive-0.13.1` maven profile, the wrong one would be selected. 
e.g. mvn -Phive -Phive=0.13.1 See PR #2685

(cherry picked from commit df607da025488d6c924d3d70eddb67f5523080d3)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc782896
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc782896
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc782896

Branch: refs/heads/branch-1.2
Commit: fc782896b5d51161feee950107df2acf17e12422
Parents: a683214
Author: fi code...@gmail.com
Authored: Mon Nov 3 12:56:56 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 12:57:09 2014 -0800

--
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fc782896/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 6191cd3..eb61353 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1359,7 +1359,7 @@
 activeByDefaultfalse/activeByDefault
   /activation
   properties
-hive.version0.13.1/hive.version
+hive.version0.13.1a/hive.version
 hive.version.short0.13.1/hive.version.short
 derby.version10.10.1.1/derby.version
   /properties


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4211][Build] Fixes hive.version in Maven profile hive-0.13.1

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 3cca19622 - df607da02


[SPARK-4211][Build] Fixes hive.version in Maven profile hive-0.13.1

instead of `hive.version=0.13.1`.
e.g. mvn -Phive -Phive=0.13.1

Note: `hive.version=0.13.1a` is the default property value. However, when 
explicitly specifying the `hive-0.13.1` maven profile, the wrong one would be 
selected.
References:  PR #2685, which resolved a package incompatibility issue with 
Hive-0.13.1 by introducing a special version Hive-0.13.1a

Author: fi code...@gmail.com

Closes #3072 from coderfi/master and squashes the following commits:

7ca4b1e [fi] Fixes the `hive-0.13.1` maven profile referencing 
`hive.version=0.13.1` instead of the Spark compatible `hive.version=0.13.1a` 
Note: `hive.version=0.13.1a` is the default version. However, when explicitly 
specifying the `hive-0.13.1` maven profile, the wrong one would be selected. 
e.g. mvn -Phive -Phive=0.13.1 See PR #2685


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df607da0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df607da0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df607da0

Branch: refs/heads/master
Commit: df607da025488d6c924d3d70eddb67f5523080d3
Parents: 3cca196
Author: fi code...@gmail.com
Authored: Mon Nov 3 12:56:56 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 12:56:56 2014 -0800

--
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/df607da0/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 6191cd3..eb61353 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1359,7 +1359,7 @@
 activeByDefaultfalse/activeByDefault
   /activation
   properties
-hive.version0.13.1/hive.version
+hive.version0.13.1a/hive.version
 hive.version.short0.13.1/hive.version.short
 derby.version10.10.1.1/derby.version
   /properties


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4207][SQL] Query which has syntax like 'not like' is not working in Spark SQL

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 fc782896b - 292da4ef2


[SPARK-4207][SQL] Query which has syntax like 'not like' is not working in 
Spark SQL

Queries which has 'not like' is not working spark sql.

sql(SELECT * FROM records where value not like 'val%')
 same query works in Spark HiveQL

Author: ravipesala ravindra.pes...@huawei.com

Closes #3075 from ravipesala/SPARK-4207 and squashes the following commits:

35c11e7 [ravipesala] Supported 'not like' syntax in sql

(cherry picked from commit 2b6e1ce6ee7b1ba8160bcbee97f5bbff5c46ca09)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/292da4ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/292da4ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/292da4ef

Branch: refs/heads/branch-1.2
Commit: 292da4ef25d6cce23bfde7b9ab663a574dfd2b00
Parents: fc78289
Author: ravipesala ravindra.pes...@huawei.com
Authored: Mon Nov 3 13:07:41 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 13:07:56 2014 -0800

--
 .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala| 1 +
 .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 5 +
 2 files changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/292da4ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 00fc4d7..5e613e0 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -242,6 +242,7 @@ class SqlParser extends AbstractSparkSQLParser {
 | termExpression ~ (RLIKE  ~ termExpression) ^^ { case e1 ~ e2 = 
RLike(e1, e2) }
 | termExpression ~ (REGEXP ~ termExpression) ^^ { case e1 ~ e2 = 
RLike(e1, e2) }
 | termExpression ~ (LIKE   ~ termExpression) ^^ { case e1 ~ e2 = 
Like(e1, e2) }
+| termExpression ~ (NOT ~ LIKE ~ termExpression) ^^ { case e1 ~ e2 = 
Not(Like(e1, e2)) }
 | termExpression ~ (IN ~ ( ~ rep1sep(termExpression, ,)) ~ ) ^^ {
 case e1 ~ e2 = In(e1, e2)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/292da4ef/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 6bf4393..702714a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -938,4 +938,9 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll {
 checkAnswer(sql(SELECT key FROM testData WHERE key not between 0 and 10 
order by key),
 (11 to 100).map(i = Seq(i)))
   }
+
+  test(SPARK-4207 Query which has syntax like 'not like' is not working in 
Spark SQL) {
+checkAnswer(sql(SELECT key FROM testData WHERE value not like '100%' 
order by key),
+(1 to 99).map(i = Seq(i)))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4207][SQL] Query which has syntax like 'not like' is not working in Spark SQL

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master df607da02 - 2b6e1ce6e


[SPARK-4207][SQL] Query which has syntax like 'not like' is not working in 
Spark SQL

Queries which has 'not like' is not working spark sql.

sql(SELECT * FROM records where value not like 'val%')
 same query works in Spark HiveQL

Author: ravipesala ravindra.pes...@huawei.com

Closes #3075 from ravipesala/SPARK-4207 and squashes the following commits:

35c11e7 [ravipesala] Supported 'not like' syntax in sql


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b6e1ce6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b6e1ce6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b6e1ce6

Branch: refs/heads/master
Commit: 2b6e1ce6ee7b1ba8160bcbee97f5bbff5c46ca09
Parents: df607da
Author: ravipesala ravindra.pes...@huawei.com
Authored: Mon Nov 3 13:07:41 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 13:07:41 2014 -0800

--
 .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala| 1 +
 .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 5 +
 2 files changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2b6e1ce6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 00fc4d7..5e613e0 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -242,6 +242,7 @@ class SqlParser extends AbstractSparkSQLParser {
 | termExpression ~ (RLIKE  ~ termExpression) ^^ { case e1 ~ e2 = 
RLike(e1, e2) }
 | termExpression ~ (REGEXP ~ termExpression) ^^ { case e1 ~ e2 = 
RLike(e1, e2) }
 | termExpression ~ (LIKE   ~ termExpression) ^^ { case e1 ~ e2 = 
Like(e1, e2) }
+| termExpression ~ (NOT ~ LIKE ~ termExpression) ^^ { case e1 ~ e2 = 
Not(Like(e1, e2)) }
 | termExpression ~ (IN ~ ( ~ rep1sep(termExpression, ,)) ~ ) ^^ {
 case e1 ~ e2 = In(e1, e2)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b6e1ce6/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 6bf4393..702714a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -938,4 +938,9 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll {
 checkAnswer(sql(SELECT key FROM testData WHERE key not between 0 and 10 
order by key),
 (11 to 100).map(i = Seq(i)))
   }
+
+  test(SPARK-4207 Query which has syntax like 'not like' is not working in 
Spark SQL) {
+checkAnswer(sql(SELECT key FROM testData WHERE value not like '100%' 
order by key),
+(1 to 99).map(i = Seq(i)))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-3594] [PySpark] [SQL] take more rows to infer schema or sampling

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 292da4ef2 - cc5dc4247


[SPARK-3594] [PySpark] [SQL] take more rows to infer schema or sampling

This patch will try to infer schema for RDD which has empty value (None, [], 
{}) in the first row. It will try first 100 rows and merge the types into 
schema, also merge fields of StructType together. If there is still NullType in 
schema, then it will show an warning, tell user to try with sampling.

If sampling is presented, it will infer schema from all the rows after sampling.

Also, add samplingRatio for jsonFile() and jsonRDD()

Author: Davies Liu davies@gmail.com
Author: Davies Liu dav...@databricks.com

Closes #2716 from davies/infer and squashes the following commits:

e678f6d [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer
34b5c63 [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer
567dc60 [Davies Liu] update docs
9767b27 [Davies Liu] Merge branch 'master' into infer
e48d7fb [Davies Liu] fix tests
29e94d5 [Davies Liu] let NullType inherit from PrimitiveType
ee5d524 [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer
540d1d5 [Davies Liu] merge fields for StructType
f93fd84 [Davies Liu] add more tests
3603e00 [Davies Liu] take more rows to infer schema, or infer the schema by 
sampling the RDD

(cherry picked from commit 24544fbce05665ab4999a1fe5aac434d29cd912c)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc5dc424
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc5dc424
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc5dc424

Branch: refs/heads/branch-1.2
Commit: cc5dc4247979dc001302f7af978801b789acdbfa
Parents: 292da4e
Author: Davies Liu davies@gmail.com
Authored: Mon Nov 3 13:17:09 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 13:17:25 2014 -0800

--
 python/pyspark/sql.py   | 196 ---
 python/pyspark/tests.py |  19 ++
 .../spark/sql/catalyst/types/dataTypes.scala|   2 +-
 3 files changed, 148 insertions(+), 69 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cc5dc424/python/pyspark/sql.py
--
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 98e41f8..675df08 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -109,6 +109,15 @@ class PrimitiveType(DataType):
 return self is other
 
 
+class NullType(PrimitiveType):
+
+Spark SQL NullType
+
+The data type representing None, used for the types which has not
+been inferred.
+
+
+
 class StringType(PrimitiveType):
 
 Spark SQL StringType
@@ -331,7 +340,7 @@ class StructField(DataType):
 
 
 
-def __init__(self, name, dataType, nullable, metadata=None):
+def __init__(self, name, dataType, nullable=True, metadata=None):
 Creates a StructField
 :param name: the name of this field.
 :param dataType: the data type of this field.
@@ -484,6 +493,7 @@ def _parse_datatype_json_value(json_value):
 
 # Mapping Python types to Spark SQL DataType
 _type_mappings = {
+type(None): NullType,
 bool: BooleanType,
 int: IntegerType,
 long: LongType,
@@ -500,22 +510,22 @@ _type_mappings = {
 
 def _infer_type(obj):
 Infer the DataType from obj
-if obj is None:
-raise ValueError(Can not infer type for None)
-
 dataType = _type_mappings.get(type(obj))
 if dataType is not None:
 return dataType()
 
 if isinstance(obj, dict):
-if not obj:
-raise ValueError(Can not infer type for empty dict)
-key, value = obj.iteritems().next()
-return MapType(_infer_type(key), _infer_type(value), True)
+for key, value in obj.iteritems():
+if key is not None and value is not None:
+return MapType(_infer_type(key), _infer_type(value), True)
+else:
+return MapType(NullType(), NullType(), True)
 elif isinstance(obj, (list, array)):
-if not obj:
-raise ValueError(Can not infer type for empty list/array)
-return ArrayType(_infer_type(obj[0]), True)
+for v in obj:
+if v is not None:
+return ArrayType(_infer_type(obj[0]), True)
+else:
+return ArrayType(NullType(), True)
 else:
 try:
 return _infer_schema(obj)
@@ -548,60 +558,93 @@ def _infer_schema(row):
 return StructType(fields)
 
 
-def _create_converter(obj, dataType):
+def _has_nulltype(dt):
+ Return whether there is NullType in `dt` or not 
+if isinstance(dt, StructType):
+return 

git commit: [SPARK-3594] [PySpark] [SQL] take more rows to infer schema or sampling

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 2b6e1ce6e - 24544fbce


[SPARK-3594] [PySpark] [SQL] take more rows to infer schema or sampling

This patch will try to infer schema for RDD which has empty value (None, [], 
{}) in the first row. It will try first 100 rows and merge the types into 
schema, also merge fields of StructType together. If there is still NullType in 
schema, then it will show an warning, tell user to try with sampling.

If sampling is presented, it will infer schema from all the rows after sampling.

Also, add samplingRatio for jsonFile() and jsonRDD()

Author: Davies Liu davies@gmail.com
Author: Davies Liu dav...@databricks.com

Closes #2716 from davies/infer and squashes the following commits:

e678f6d [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer
34b5c63 [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer
567dc60 [Davies Liu] update docs
9767b27 [Davies Liu] Merge branch 'master' into infer
e48d7fb [Davies Liu] fix tests
29e94d5 [Davies Liu] let NullType inherit from PrimitiveType
ee5d524 [Davies Liu] Merge branch 'master' of github.com:apache/spark into infer
540d1d5 [Davies Liu] merge fields for StructType
f93fd84 [Davies Liu] add more tests
3603e00 [Davies Liu] take more rows to infer schema, or infer the schema by 
sampling the RDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24544fbc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24544fbc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24544fbc

Branch: refs/heads/master
Commit: 24544fbce05665ab4999a1fe5aac434d29cd912c
Parents: 2b6e1ce
Author: Davies Liu davies@gmail.com
Authored: Mon Nov 3 13:17:09 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 13:17:09 2014 -0800

--
 python/pyspark/sql.py   | 196 ---
 python/pyspark/tests.py |  19 ++
 .../spark/sql/catalyst/types/dataTypes.scala|   2 +-
 3 files changed, 148 insertions(+), 69 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/24544fbc/python/pyspark/sql.py
--
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 98e41f8..675df08 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -109,6 +109,15 @@ class PrimitiveType(DataType):
 return self is other
 
 
+class NullType(PrimitiveType):
+
+Spark SQL NullType
+
+The data type representing None, used for the types which has not
+been inferred.
+
+
+
 class StringType(PrimitiveType):
 
 Spark SQL StringType
@@ -331,7 +340,7 @@ class StructField(DataType):
 
 
 
-def __init__(self, name, dataType, nullable, metadata=None):
+def __init__(self, name, dataType, nullable=True, metadata=None):
 Creates a StructField
 :param name: the name of this field.
 :param dataType: the data type of this field.
@@ -484,6 +493,7 @@ def _parse_datatype_json_value(json_value):
 
 # Mapping Python types to Spark SQL DataType
 _type_mappings = {
+type(None): NullType,
 bool: BooleanType,
 int: IntegerType,
 long: LongType,
@@ -500,22 +510,22 @@ _type_mappings = {
 
 def _infer_type(obj):
 Infer the DataType from obj
-if obj is None:
-raise ValueError(Can not infer type for None)
-
 dataType = _type_mappings.get(type(obj))
 if dataType is not None:
 return dataType()
 
 if isinstance(obj, dict):
-if not obj:
-raise ValueError(Can not infer type for empty dict)
-key, value = obj.iteritems().next()
-return MapType(_infer_type(key), _infer_type(value), True)
+for key, value in obj.iteritems():
+if key is not None and value is not None:
+return MapType(_infer_type(key), _infer_type(value), True)
+else:
+return MapType(NullType(), NullType(), True)
 elif isinstance(obj, (list, array)):
-if not obj:
-raise ValueError(Can not infer type for empty list/array)
-return ArrayType(_infer_type(obj[0]), True)
+for v in obj:
+if v is not None:
+return ArrayType(_infer_type(obj[0]), True)
+else:
+return ArrayType(NullType(), True)
 else:
 try:
 return _infer_schema(obj)
@@ -548,60 +558,93 @@ def _infer_schema(row):
 return StructType(fields)
 
 
-def _create_converter(obj, dataType):
+def _has_nulltype(dt):
+ Return whether there is NullType in `dt` or not 
+if isinstance(dt, StructType):
+return any(_has_nulltype(f.dataType) for f in dt.fields)
+elif isinstance(dt, ArrayType):
+return _has_nulltype((dt.elementType))
+   

git commit: [SPARK-4202][SQL] Simple DSL support for Scala UDF

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 24544fbce - c238fb423


[SPARK-4202][SQL] Simple DSL support for Scala UDF

This feature is based on an offline discussion with mengxr, hopefully can be 
useful for the new MLlib pipeline API.

For the following test snippet

```scala
case class KeyValue(key: Int, value: String)
val testData = sc.parallelize(1 to 10).map(i = KeyValue(i, 
i.toString)).toSchemaRDD
def foo(a: Int, b: String) = a.toString + b
```

the newly introduced DSL enables the following syntax

```scala
import org.apache.spark.sql.catalyst.dsl._
testData.select(Star(None), foo.call('key, 'value) as 'result)
```

which is equivalent to

```scala
testData.registerTempTable(testData)
sqlContext.registerFunction(foo, foo)
sql(SELECT *, foo(key, value) AS result FROM testData)
```

Author: Cheng Lian l...@databricks.com

Closes #3067 from liancheng/udf-dsl and squashes the following commits:

f132818 [Cheng Lian] Adds DSL support for Scala UDF


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c238fb42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c238fb42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c238fb42

Branch: refs/heads/master
Commit: c238fb423d1011bd1b1e6201d769b72e52664fc6
Parents: 24544fb
Author: Cheng Lian l...@databricks.com
Authored: Mon Nov 3 13:20:33 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 13:20:33 2014 -0800

--
 .../apache/spark/sql/catalyst/dsl/package.scala | 59 
 .../org/apache/spark/sql/DslQuerySuite.scala| 17 --
 2 files changed, 72 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c238fb42/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 7e6d770..3314e15 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -22,6 +22,7 @@ import java.sql.{Date, Timestamp}
 import org.apache.spark.sql.catalyst.types.decimal.Decimal
 
 import scala.language.implicitConversions
+import scala.reflect.runtime.universe.{TypeTag, typeTag}
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions._
@@ -285,4 +286,62 @@ package object dsl {
   def writeToFile(path: String) = WriteToFile(path, logicalPlan)
 }
   }
+
+  case class ScalaUdfBuilder[T: TypeTag](f: AnyRef) {
+def call(args: Expression*) = ScalaUdf(f, 
ScalaReflection.schemaFor(typeTag[T]).dataType, args)
+  }
+
+  // scalastyle:off
+  /** functionToUdfBuilder 1-22 were generated by this script
+
+(1 to 22).map { x =
+  val argTypes = Seq.fill(x)(_).mkString(, )
+  simplicit def functionToUdfBuilder[T: TypeTag](func: 
Function$x[$argTypes, T]) = ScalaUdfBuilder(func)
+}
+  */
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function1[_, T]) = 
ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function2[_, _, T]) = 
ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function3[_, _, _, T]) = 
ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function4[_, _, _, _, 
T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function5[_, _, _, _, _, 
T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function6[_, _, _, _, _, 
_, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function7[_, _, _, _, _, 
_, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function8[_, _, _, _, _, 
_, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function9[_, _, _, _, _, 
_, _, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function10[_, _, _, _, 
_, _, _, _, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function11[_, _, _, _, 
_, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function12[_, _, _, _, 
_, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function13[_, _, _, _, 
_, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function14[_, _, _, _, 
_, _, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def 

git commit: [SPARK-4202][SQL] Simple DSL support for Scala UDF

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 cc5dc4247 - 572300ba8


[SPARK-4202][SQL] Simple DSL support for Scala UDF

This feature is based on an offline discussion with mengxr, hopefully can be 
useful for the new MLlib pipeline API.

For the following test snippet

```scala
case class KeyValue(key: Int, value: String)
val testData = sc.parallelize(1 to 10).map(i = KeyValue(i, 
i.toString)).toSchemaRDD
def foo(a: Int, b: String) = a.toString + b
```

the newly introduced DSL enables the following syntax

```scala
import org.apache.spark.sql.catalyst.dsl._
testData.select(Star(None), foo.call('key, 'value) as 'result)
```

which is equivalent to

```scala
testData.registerTempTable(testData)
sqlContext.registerFunction(foo, foo)
sql(SELECT *, foo(key, value) AS result FROM testData)
```

Author: Cheng Lian l...@databricks.com

Closes #3067 from liancheng/udf-dsl and squashes the following commits:

f132818 [Cheng Lian] Adds DSL support for Scala UDF

(cherry picked from commit c238fb423d1011bd1b1e6201d769b72e52664fc6)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/572300ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/572300ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/572300ba

Branch: refs/heads/branch-1.2
Commit: 572300ba8a5f24b52f19d7033a456248da20bfed
Parents: cc5dc42
Author: Cheng Lian l...@databricks.com
Authored: Mon Nov 3 13:20:33 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 13:20:51 2014 -0800

--
 .../apache/spark/sql/catalyst/dsl/package.scala | 59 
 .../org/apache/spark/sql/DslQuerySuite.scala| 17 --
 2 files changed, 72 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/572300ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 7e6d770..3314e15 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -22,6 +22,7 @@ import java.sql.{Date, Timestamp}
 import org.apache.spark.sql.catalyst.types.decimal.Decimal
 
 import scala.language.implicitConversions
+import scala.reflect.runtime.universe.{TypeTag, typeTag}
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions._
@@ -285,4 +286,62 @@ package object dsl {
   def writeToFile(path: String) = WriteToFile(path, logicalPlan)
 }
   }
+
+  case class ScalaUdfBuilder[T: TypeTag](f: AnyRef) {
+def call(args: Expression*) = ScalaUdf(f, 
ScalaReflection.schemaFor(typeTag[T]).dataType, args)
+  }
+
+  // scalastyle:off
+  /** functionToUdfBuilder 1-22 were generated by this script
+
+(1 to 22).map { x =
+  val argTypes = Seq.fill(x)(_).mkString(, )
+  simplicit def functionToUdfBuilder[T: TypeTag](func: 
Function$x[$argTypes, T]) = ScalaUdfBuilder(func)
+}
+  */
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function1[_, T]) = 
ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function2[_, _, T]) = 
ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function3[_, _, _, T]) = 
ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function4[_, _, _, _, 
T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function5[_, _, _, _, _, 
T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function6[_, _, _, _, _, 
_, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function7[_, _, _, _, _, 
_, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function8[_, _, _, _, _, 
_, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function9[_, _, _, _, _, 
_, _, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function10[_, _, _, _, 
_, _, _, _, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function11[_, _, _, _, 
_, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function12[_, _, _, _, 
_, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def functionToUdfBuilder[T: TypeTag](func: Function13[_, _, _, _, 
_, _, _, _, _, _, _, _, _, T]) = ScalaUdfBuilder(func)
+
+  implicit def 

git commit: [SPARK-4152] [SQL] Avoid data change in CTAS while table already existed

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master c238fb423 - e83f13e8d


[SPARK-4152] [SQL] Avoid data change in CTAS while table already existed

CREATE TABLE t1 (a String);
CREATE TABLE t1 AS SELECT key FROM src; – throw exception
CREATE TABLE if not exists t1 AS SELECT key FROM src; – expect do nothing, 
currently it will overwrite the t1, which is incorrect.

Author: Cheng Hao hao.ch...@intel.com

Closes #3013 from chenghao-intel/ctas_unittest and squashes the following 
commits:

194113e [Cheng Hao] fix bug in CTAS when table already existed


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e83f13e8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e83f13e8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e83f13e8

Branch: refs/heads/master
Commit: e83f13e8d37ca33f4e183e977d077221b90c6025
Parents: c238fb4
Author: Cheng Hao hao.ch...@intel.com
Authored: Mon Nov 3 13:59:43 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 13:59:43 2014 -0800

--
 .../spark/sql/catalyst/analysis/Catalog.scala   | 22 
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  6 ++
 .../hive/execution/CreateTableAsSelect.scala| 12 ++-
 .../sql/hive/execution/SQLQuerySuite.scala  |  9 ++--
 4 files changed, 46 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e83f13e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 2059a91..0415d74 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -28,6 +28,8 @@ trait Catalog {
 
   def caseSensitive: Boolean
 
+  def tableExists(db: Option[String], tableName: String): Boolean
+
   def lookupRelation(
 databaseName: Option[String],
 tableName: String,
@@ -82,6 +84,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends 
Catalog {
 tables.clear()
   }
 
+  override def tableExists(db: Option[String], tableName: String): Boolean = {
+val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+tables.get(tblName) match {
+  case Some(_) = true
+  case None = false
+}
+  }
+
   override def lookupRelation(
   databaseName: Option[String],
   tableName: String,
@@ -107,6 +117,14 @@ trait OverrideCatalog extends Catalog {
   // TODO: This doesn't work when the database changes...
   val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
 
+  abstract override def tableExists(db: Option[String], tableName: String): 
Boolean = {
+val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+overrides.get((dbName, tblName)) match {
+  case Some(_) = true
+  case None = super.tableExists(db, tableName)
+}
+  }
+
   abstract override def lookupRelation(
 databaseName: Option[String],
 tableName: String,
@@ -149,6 +167,10 @@ object EmptyCatalog extends Catalog {
 
   val caseSensitive: Boolean = true
 
+  def tableExists(db: Option[String], tableName: String): Boolean = {
+throw new UnsupportedOperationException
+  }
+
   def lookupRelation(
 databaseName: Option[String],
 tableName: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/e83f13e8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 096b4a0..0baf4c9 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -57,6 +57,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
 
   val caseSensitive: Boolean = false
 
+  def tableExists(db: Option[String], tableName: String): Boolean = {
+val (databaseName, tblName) = processDatabaseAndTableName(
+  db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
+client.getTable(databaseName, tblName, false) != null
+  }
+
   def lookupRelation(
   db: Option[String],
   tableName: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/e83f13e8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala

git commit: [SPARK-4152] [SQL] Avoid data change in CTAS while table already existed

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 572300ba8 - 6104754f7


[SPARK-4152] [SQL] Avoid data change in CTAS while table already existed

CREATE TABLE t1 (a String);
CREATE TABLE t1 AS SELECT key FROM src; – throw exception
CREATE TABLE if not exists t1 AS SELECT key FROM src; – expect do nothing, 
currently it will overwrite the t1, which is incorrect.

Author: Cheng Hao hao.ch...@intel.com

Closes #3013 from chenghao-intel/ctas_unittest and squashes the following 
commits:

194113e [Cheng Hao] fix bug in CTAS when table already existed

(cherry picked from commit e83f13e8d37ca33f4e183e977d077221b90c6025)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6104754f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6104754f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6104754f

Branch: refs/heads/branch-1.2
Commit: 6104754f711da9eb0c09daf377bcd750d2d23f8a
Parents: 572300b
Author: Cheng Hao hao.ch...@intel.com
Authored: Mon Nov 3 13:59:43 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 14:00:06 2014 -0800

--
 .../spark/sql/catalyst/analysis/Catalog.scala   | 22 
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  6 ++
 .../hive/execution/CreateTableAsSelect.scala| 12 ++-
 .../sql/hive/execution/SQLQuerySuite.scala  |  9 ++--
 4 files changed, 46 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6104754f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 2059a91..0415d74 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -28,6 +28,8 @@ trait Catalog {
 
   def caseSensitive: Boolean
 
+  def tableExists(db: Option[String], tableName: String): Boolean
+
   def lookupRelation(
 databaseName: Option[String],
 tableName: String,
@@ -82,6 +84,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends 
Catalog {
 tables.clear()
   }
 
+  override def tableExists(db: Option[String], tableName: String): Boolean = {
+val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+tables.get(tblName) match {
+  case Some(_) = true
+  case None = false
+}
+  }
+
   override def lookupRelation(
   databaseName: Option[String],
   tableName: String,
@@ -107,6 +117,14 @@ trait OverrideCatalog extends Catalog {
   // TODO: This doesn't work when the database changes...
   val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
 
+  abstract override def tableExists(db: Option[String], tableName: String): 
Boolean = {
+val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+overrides.get((dbName, tblName)) match {
+  case Some(_) = true
+  case None = super.tableExists(db, tableName)
+}
+  }
+
   abstract override def lookupRelation(
 databaseName: Option[String],
 tableName: String,
@@ -149,6 +167,10 @@ object EmptyCatalog extends Catalog {
 
   val caseSensitive: Boolean = true
 
+  def tableExists(db: Option[String], tableName: String): Boolean = {
+throw new UnsupportedOperationException
+  }
+
   def lookupRelation(
 databaseName: Option[String],
 tableName: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/6104754f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 096b4a0..0baf4c9 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -57,6 +57,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
 
   val caseSensitive: Boolean = false
 
+  def tableExists(db: Option[String], tableName: String): Boolean = {
+val (databaseName, tblName) = processDatabaseAndTableName(
+  db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
+client.getTable(databaseName, tblName, false) != null
+  }
+
   def lookupRelation(
   db: Option[String],
   tableName: String,


git commit: [SQL] More aggressive defaults

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 6104754f7 - 51985f78c


[SQL] More aggressive defaults

 - Turns on compression for in-memory cached data by default
 - Changes the default parquet compression format back to gzip (we have seen 
more OOMs with production workloads due to the way Snappy allocates memory)
 - Ups the batch size to 10,000 rows
 - Increases the broadcast threshold to 10mb.
 - Uses our parquet implementation instead of the hive one by default.
 - Cache parquet metadata by default.

Author: Michael Armbrust mich...@databricks.com

Closes #3064 from marmbrus/fasterDefaults and squashes the following commits:

97ee9f8 [Michael Armbrust] parquet codec docs
e641694 [Michael Armbrust] Remote also
a12866a [Michael Armbrust] Cache metadata.
2d73acc [Michael Armbrust] Update docs defaults.
d63d2d5 [Michael Armbrust] document parquet option
da373f9 [Michael Armbrust] More aggressive defaults

(cherry picked from commit 25bef7e6951301e93004567fc0cef96bf8d1a224)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51985f78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51985f78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51985f78

Branch: refs/heads/branch-1.2
Commit: 51985f78ca5f728f8b9233b703110f541d27b274
Parents: 6104754
Author: Michael Armbrust mich...@databricks.com
Authored: Mon Nov 3 14:08:27 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 14:08:40 2014 -0800

--
 docs/sql-programming-guide.md | 18 +-
 .../main/scala/org/apache/spark/sql/SQLConf.scala | 10 +-
 .../sql/parquet/ParquetTableOperations.scala  |  6 +++---
 .../org/apache/spark/sql/hive/HiveContext.scala   |  2 +-
 4 files changed, 22 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/51985f78/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d4ade93..e399fec 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -582,19 +582,27 @@ Configuration of Parquet can be done using the `setConf` 
method on SQLContext or
 /tr
 tr
   tdcodespark.sql.parquet.cacheMetadata/code/td
-  tdfalse/td
+  tdtrue/td
   td
 Turns on caching of Parquet schema metadata.  Can speed up querying of 
static data.
   /td
 /tr
 tr
   tdcodespark.sql.parquet.compression.codec/code/td
-  tdsnappy/td
+  tdgzip/td
   td
 Sets the compression codec use when writing Parquet files. Acceptable 
values include: 
 uncompressed, snappy, gzip, lzo.
   /td
 /tr
+tr
+  tdcodespark.sql.hive.convertMetastoreParquet/code/td
+  tdtrue/td
+  td
+When set to false, Spark SQL will use the Hive SerDe for parquet tables 
instead of the built in
+support.
+  /td
+/tr
 /table
 
 ## JSON Datasets
@@ -815,7 +823,7 @@ Configuration of in-memory caching can be done using the 
`setConf` method on SQL
 trthProperty Name/ththDefault/ththMeaning/th/tr
 tr
   tdcodespark.sql.inMemoryColumnarStorage.compressed/code/td
-  tdfalse/td
+  tdtrue/td
   td
 When set to true Spark SQL will automatically select a compression codec 
for each column based
 on statistics of the data.
@@ -823,7 +831,7 @@ Configuration of in-memory caching can be done using the 
`setConf` method on SQL
 /tr
 tr
   tdcodespark.sql.inMemoryColumnarStorage.batchSize/code/td
-  td1000/td
+  td1/td
   td
 Controls the size of batches for columnar caching.  Larger batch sizes can 
improve memory utilization
 and compression, but risk OOMs when caching data.
@@ -841,7 +849,7 @@ that these options will be deprecated in future release as 
more optimizations ar
   trthProperty Name/ththDefault/ththMeaning/th/tr
   tr
 tdcodespark.sql.autoBroadcastJoinThreshold/code/td
-td1/td
+td10485760 (10 MB)/td
 td
   Configures the maximum size in bytes for a table that will be broadcast 
to all worker nodes when
   performing a join.  By setting this value to -1 broadcasting can be 
disabled.  Note that currently

http://git-wip-us.apache.org/repos/asf/spark/blob/51985f78/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 07e6e2e..279495a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -79,13 +79,13 @@ private[sql] trait SQLConf {
   private[spark] def dialect: String = getConf(DIALECT, sql)
 
   /** When 

git commit: [SQL] More aggressive defaults

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master e83f13e8d - 25bef7e69


[SQL] More aggressive defaults

 - Turns on compression for in-memory cached data by default
 - Changes the default parquet compression format back to gzip (we have seen 
more OOMs with production workloads due to the way Snappy allocates memory)
 - Ups the batch size to 10,000 rows
 - Increases the broadcast threshold to 10mb.
 - Uses our parquet implementation instead of the hive one by default.
 - Cache parquet metadata by default.

Author: Michael Armbrust mich...@databricks.com

Closes #3064 from marmbrus/fasterDefaults and squashes the following commits:

97ee9f8 [Michael Armbrust] parquet codec docs
e641694 [Michael Armbrust] Remote also
a12866a [Michael Armbrust] Cache metadata.
2d73acc [Michael Armbrust] Update docs defaults.
d63d2d5 [Michael Armbrust] document parquet option
da373f9 [Michael Armbrust] More aggressive defaults


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25bef7e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25bef7e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25bef7e6

Branch: refs/heads/master
Commit: 25bef7e6951301e93004567fc0cef96bf8d1a224
Parents: e83f13e
Author: Michael Armbrust mich...@databricks.com
Authored: Mon Nov 3 14:08:27 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 14:08:27 2014 -0800

--
 docs/sql-programming-guide.md | 18 +-
 .../main/scala/org/apache/spark/sql/SQLConf.scala | 10 +-
 .../sql/parquet/ParquetTableOperations.scala  |  6 +++---
 .../org/apache/spark/sql/hive/HiveContext.scala   |  2 +-
 4 files changed, 22 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/25bef7e6/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d4ade93..e399fec 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -582,19 +582,27 @@ Configuration of Parquet can be done using the `setConf` 
method on SQLContext or
 /tr
 tr
   tdcodespark.sql.parquet.cacheMetadata/code/td
-  tdfalse/td
+  tdtrue/td
   td
 Turns on caching of Parquet schema metadata.  Can speed up querying of 
static data.
   /td
 /tr
 tr
   tdcodespark.sql.parquet.compression.codec/code/td
-  tdsnappy/td
+  tdgzip/td
   td
 Sets the compression codec use when writing Parquet files. Acceptable 
values include: 
 uncompressed, snappy, gzip, lzo.
   /td
 /tr
+tr
+  tdcodespark.sql.hive.convertMetastoreParquet/code/td
+  tdtrue/td
+  td
+When set to false, Spark SQL will use the Hive SerDe for parquet tables 
instead of the built in
+support.
+  /td
+/tr
 /table
 
 ## JSON Datasets
@@ -815,7 +823,7 @@ Configuration of in-memory caching can be done using the 
`setConf` method on SQL
 trthProperty Name/ththDefault/ththMeaning/th/tr
 tr
   tdcodespark.sql.inMemoryColumnarStorage.compressed/code/td
-  tdfalse/td
+  tdtrue/td
   td
 When set to true Spark SQL will automatically select a compression codec 
for each column based
 on statistics of the data.
@@ -823,7 +831,7 @@ Configuration of in-memory caching can be done using the 
`setConf` method on SQL
 /tr
 tr
   tdcodespark.sql.inMemoryColumnarStorage.batchSize/code/td
-  td1000/td
+  td1/td
   td
 Controls the size of batches for columnar caching.  Larger batch sizes can 
improve memory utilization
 and compression, but risk OOMs when caching data.
@@ -841,7 +849,7 @@ that these options will be deprecated in future release as 
more optimizations ar
   trthProperty Name/ththDefault/ththMeaning/th/tr
   tr
 tdcodespark.sql.autoBroadcastJoinThreshold/code/td
-td1/td
+td10485760 (10 MB)/td
 td
   Configures the maximum size in bytes for a table that will be broadcast 
to all worker nodes when
   performing a join.  By setting this value to -1 broadcasting can be 
disabled.  Note that currently

http://git-wip-us.apache.org/repos/asf/spark/blob/25bef7e6/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 07e6e2e..279495a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -79,13 +79,13 @@ private[sql] trait SQLConf {
   private[spark] def dialect: String = getConf(DIALECT, sql)
 
   /** When true tables cached using the in-memory columnar caching will be 
compressed. */
-  private[spark] def useCompression: Boolean = 

git commit: SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta...

2014-11-03 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 25bef7e69 - 28128150e


SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta...

...ntiation

Author: Sandy Ryza sa...@cloudera.com

Closes #3045 from sryza/sandy-spark-4178 and squashes the following commits:

8d2e70e [Sandy Ryza] Kostas's review feedback
e5b27c0 [Sandy Ryza] SPARK-4178. Hadoop input metrics ignore bytes read in 
RecordReader instantiation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28128150
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28128150
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28128150

Branch: refs/heads/master
Commit: 28128150e7e0c2b7d1c483e67214bdaef59f7d75
Parents: 25bef7e
Author: Sandy Ryza sa...@cloudera.com
Authored: Mon Nov 3 15:19:01 2014 -0800
Committer: Patrick Wendell pwend...@gmail.com
Committed: Mon Nov 3 15:19:01 2014 -0800

--
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 25 ++
 .../org/apache/spark/rdd/NewHadoopRDD.scala | 26 ++-
 .../spark/metrics/InputMetricsSuite.scala   | 27 ++--
 3 files changed, 53 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/28128150/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 946fb56..a157e36 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -211,20 +211,11 @@ class HadoopRDD[K, V](
 
   val split = theSplit.asInstanceOf[HadoopPartition]
   logInfo(Input split:  + split.inputSplit)
-  var reader: RecordReader[K, V] = null
   val jobConf = getJobConf()
-  val inputFormat = getInputFormat(jobConf)
-  HadoopRDD.addLocalConfiguration(new 
SimpleDateFormat(MMddHHmm).format(createTime),
-context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
-  reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
-
-  // Register an on-task-completion callback to close the input stream.
-  context.addTaskCompletionListener{ context = closeIfNeeded() }
-  val key: K = reader.createKey()
-  val value: V = reader.createValue()
 
   val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
-  // Find a function that will return the FileSystem bytes read by this 
thread.
+  // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
+  // creating RecordReader, because RecordReader's constructor might read 
some bytes
   val bytesReadCallback = if 
(split.inputSplit.value.isInstanceOf[FileSplit]) {
 SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
   split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
@@ -234,6 +225,18 @@ class HadoopRDD[K, V](
   if (bytesReadCallback.isDefined) {
 context.taskMetrics.inputMetrics = Some(inputMetrics)
   }
+
+  var reader: RecordReader[K, V] = null
+  val inputFormat = getInputFormat(jobConf)
+  HadoopRDD.addLocalConfiguration(new 
SimpleDateFormat(MMddHHmm).format(createTime),
+context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
+  reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
+
+  // Register an on-task-completion callback to close the input stream.
+  context.addTaskCompletionListener{ context = closeIfNeeded() }
+  val key: K = reader.createKey()
+  val value: V = reader.createValue()
+
   var recordsSinceMetricsUpdate = 0
 
   override def getNext() = {

http://git-wip-us.apache.org/repos/asf/spark/blob/28128150/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 6d6b867..351e145 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -107,20 +107,10 @@ class NewHadoopRDD[K, V](
   val split = theSplit.asInstanceOf[NewHadoopPartition]
   logInfo(Input split:  + split.serializableHadoopSplit)
   val conf = confBroadcast.value.value
-  val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, 
split.index, 0)
-  val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
-  val format = inputFormatClass.newInstance
-  format match {
-case configurable: Configurable 

git commit: SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta...

2014-11-03 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 51985f78c - fa86d862f


SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta...

...ntiation

Author: Sandy Ryza sa...@cloudera.com

Closes #3045 from sryza/sandy-spark-4178 and squashes the following commits:

8d2e70e [Sandy Ryza] Kostas's review feedback
e5b27c0 [Sandy Ryza] SPARK-4178. Hadoop input metrics ignore bytes read in 
RecordReader instantiation

(cherry picked from commit 28128150e7e0c2b7d1c483e67214bdaef59f7d75)
Signed-off-by: Patrick Wendell pwend...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa86d862
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa86d862
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa86d862

Branch: refs/heads/branch-1.2
Commit: fa86d862f98cfea3d9afff6e61b3141c9b08f949
Parents: 51985f7
Author: Sandy Ryza sa...@cloudera.com
Authored: Mon Nov 3 15:19:01 2014 -0800
Committer: Patrick Wendell pwend...@gmail.com
Committed: Mon Nov 3 15:19:13 2014 -0800

--
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 25 ++
 .../org/apache/spark/rdd/NewHadoopRDD.scala | 26 ++-
 .../spark/metrics/InputMetricsSuite.scala   | 27 ++--
 3 files changed, 53 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fa86d862/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 946fb56..a157e36 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -211,20 +211,11 @@ class HadoopRDD[K, V](
 
   val split = theSplit.asInstanceOf[HadoopPartition]
   logInfo(Input split:  + split.inputSplit)
-  var reader: RecordReader[K, V] = null
   val jobConf = getJobConf()
-  val inputFormat = getInputFormat(jobConf)
-  HadoopRDD.addLocalConfiguration(new 
SimpleDateFormat(MMddHHmm).format(createTime),
-context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
-  reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
-
-  // Register an on-task-completion callback to close the input stream.
-  context.addTaskCompletionListener{ context = closeIfNeeded() }
-  val key: K = reader.createKey()
-  val value: V = reader.createValue()
 
   val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
-  // Find a function that will return the FileSystem bytes read by this 
thread.
+  // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
+  // creating RecordReader, because RecordReader's constructor might read 
some bytes
   val bytesReadCallback = if 
(split.inputSplit.value.isInstanceOf[FileSplit]) {
 SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
   split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
@@ -234,6 +225,18 @@ class HadoopRDD[K, V](
   if (bytesReadCallback.isDefined) {
 context.taskMetrics.inputMetrics = Some(inputMetrics)
   }
+
+  var reader: RecordReader[K, V] = null
+  val inputFormat = getInputFormat(jobConf)
+  HadoopRDD.addLocalConfiguration(new 
SimpleDateFormat(MMddHHmm).format(createTime),
+context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
+  reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
+
+  // Register an on-task-completion callback to close the input stream.
+  context.addTaskCompletionListener{ context = closeIfNeeded() }
+  val key: K = reader.createKey()
+  val value: V = reader.createValue()
+
   var recordsSinceMetricsUpdate = 0
 
   override def getNext() = {

http://git-wip-us.apache.org/repos/asf/spark/blob/fa86d862/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 6d6b867..351e145 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -107,20 +107,10 @@ class NewHadoopRDD[K, V](
   val split = theSplit.asInstanceOf[NewHadoopPartition]
   logInfo(Input split:  + split.serializableHadoopSplit)
   val conf = confBroadcast.value.value
-  val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, 
split.index, 0)
-  val hadoopAttemptContext = 

git commit: [SQL] Convert arguments to Scala UDFs

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 28128150e - 15b58a223


[SQL] Convert arguments to Scala UDFs

Author: Michael Armbrust mich...@databricks.com

Closes #3077 from marmbrus/udfsWithUdts and squashes the following commits:

34b5f27 [Michael Armbrust] style
504adef [Michael Armbrust] Convert arguments to Scala UDFs


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15b58a22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15b58a22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15b58a22

Branch: refs/heads/master
Commit: 15b58a2234ab7ba30c9c0cbb536177a3c725e350
Parents: 2812815
Author: Michael Armbrust mich...@databricks.com
Authored: Mon Nov 3 18:04:51 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 18:04:51 2014 -0800

--
 .../sql/catalyst/expressions/ScalaUdf.scala | 560 ++-
 .../apache/spark/sql/UserDefinedTypeSuite.scala |  18 +-
 2 files changed, 316 insertions(+), 262 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/15b58a22/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index fa1786e..18c96da 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -34,320 +34,366 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, 
children: Seq[Expressi
 
   override def toString = sscalaUDF(${children.mkString(,)})
 
+  // scalastyle:off
+
   /** This method has been generated by this script
 
 (1 to 22).map { x =
   val anys = (1 to x).map(x = Any).reduce(_ + ,  + _)
-  val evals = (0 to x - 1).map(x = schildren($x).eval(input)).reduce(_ 
+ ,\n + _)
+  val evals = (0 to x - 1).map(x = s
ScalaReflection.convertToScala(children($x).eval(input), 
children($x).dataType)).reduce(_ + ,\n + _)
 
 s
 case $x =
   function.asInstanceOf[($anys) = Any](
-  $evals)
+$evals)
 
-}
+}.foreach(println)
 
   */
 
-  // scalastyle:off
   override def eval(input: Row): Any = {
 val result = children.size match {
   case 0 = function.asInstanceOf[() = Any]()
-  case 1 = function.asInstanceOf[(Any) = Any](children(0).eval(input))
+  case 1 =
+function.asInstanceOf[(Any) = Any](
+  ScalaReflection.convertToScala(children(0).eval(input), 
children(0).dataType))
+
+
   case 2 =
 function.asInstanceOf[(Any, Any) = Any](
-  children(0).eval(input),
-  children(1).eval(input))
+  ScalaReflection.convertToScala(children(0).eval(input), 
children(0).dataType),
+  ScalaReflection.convertToScala(children(1).eval(input), 
children(1).dataType))
+
+
   case 3 =
 function.asInstanceOf[(Any, Any, Any) = Any](
-  children(0).eval(input),
-  children(1).eval(input),
-  children(2).eval(input))
+  ScalaReflection.convertToScala(children(0).eval(input), 
children(0).dataType),
+  ScalaReflection.convertToScala(children(1).eval(input), 
children(1).dataType),
+  ScalaReflection.convertToScala(children(2).eval(input), 
children(2).dataType))
+
+
   case 4 =
 function.asInstanceOf[(Any, Any, Any, Any) = Any](
-  children(0).eval(input),
-  children(1).eval(input),
-  children(2).eval(input),
-  children(3).eval(input))
+  ScalaReflection.convertToScala(children(0).eval(input), 
children(0).dataType),
+  ScalaReflection.convertToScala(children(1).eval(input), 
children(1).dataType),
+  ScalaReflection.convertToScala(children(2).eval(input), 
children(2).dataType),
+  ScalaReflection.convertToScala(children(3).eval(input), 
children(3).dataType))
+
+
   case 5 =
 function.asInstanceOf[(Any, Any, Any, Any, Any) = Any](
-  children(0).eval(input),
-  children(1).eval(input),
-  children(2).eval(input),
-  children(3).eval(input),
-  children(4).eval(input))
+  ScalaReflection.convertToScala(children(0).eval(input), 
children(0).dataType),
+  ScalaReflection.convertToScala(children(1).eval(input), 
children(1).dataType),
+  ScalaReflection.convertToScala(children(2).eval(input), 
children(2).dataType),
+  ScalaReflection.convertToScala(children(3).eval(input), 
children(3).dataType),
+  ScalaReflection.convertToScala(children(4).eval(input), 

git commit: [SQL] Convert arguments to Scala UDFs

2014-11-03 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 fa86d862f - 52db2b942


[SQL] Convert arguments to Scala UDFs

Author: Michael Armbrust mich...@databricks.com

Closes #3077 from marmbrus/udfsWithUdts and squashes the following commits:

34b5f27 [Michael Armbrust] style
504adef [Michael Armbrust] Convert arguments to Scala UDFs

(cherry picked from commit 15b58a2234ab7ba30c9c0cbb536177a3c725e350)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52db2b94
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52db2b94
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52db2b94

Branch: refs/heads/branch-1.2
Commit: 52db2b9429e00d8ed398a2432ad6a26cd1e5920c
Parents: fa86d86
Author: Michael Armbrust mich...@databricks.com
Authored: Mon Nov 3 18:04:51 2014 -0800
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Nov 3 18:05:02 2014 -0800

--
 .../sql/catalyst/expressions/ScalaUdf.scala | 560 ++-
 .../apache/spark/sql/UserDefinedTypeSuite.scala |  18 +-
 2 files changed, 316 insertions(+), 262 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/52db2b94/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index fa1786e..18c96da 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -34,320 +34,366 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, 
children: Seq[Expressi
 
   override def toString = sscalaUDF(${children.mkString(,)})
 
+  // scalastyle:off
+
   /** This method has been generated by this script
 
 (1 to 22).map { x =
   val anys = (1 to x).map(x = Any).reduce(_ + ,  + _)
-  val evals = (0 to x - 1).map(x = schildren($x).eval(input)).reduce(_ 
+ ,\n + _)
+  val evals = (0 to x - 1).map(x = s
ScalaReflection.convertToScala(children($x).eval(input), 
children($x).dataType)).reduce(_ + ,\n + _)
 
 s
 case $x =
   function.asInstanceOf[($anys) = Any](
-  $evals)
+$evals)
 
-}
+}.foreach(println)
 
   */
 
-  // scalastyle:off
   override def eval(input: Row): Any = {
 val result = children.size match {
   case 0 = function.asInstanceOf[() = Any]()
-  case 1 = function.asInstanceOf[(Any) = Any](children(0).eval(input))
+  case 1 =
+function.asInstanceOf[(Any) = Any](
+  ScalaReflection.convertToScala(children(0).eval(input), 
children(0).dataType))
+
+
   case 2 =
 function.asInstanceOf[(Any, Any) = Any](
-  children(0).eval(input),
-  children(1).eval(input))
+  ScalaReflection.convertToScala(children(0).eval(input), 
children(0).dataType),
+  ScalaReflection.convertToScala(children(1).eval(input), 
children(1).dataType))
+
+
   case 3 =
 function.asInstanceOf[(Any, Any, Any) = Any](
-  children(0).eval(input),
-  children(1).eval(input),
-  children(2).eval(input))
+  ScalaReflection.convertToScala(children(0).eval(input), 
children(0).dataType),
+  ScalaReflection.convertToScala(children(1).eval(input), 
children(1).dataType),
+  ScalaReflection.convertToScala(children(2).eval(input), 
children(2).dataType))
+
+
   case 4 =
 function.asInstanceOf[(Any, Any, Any, Any) = Any](
-  children(0).eval(input),
-  children(1).eval(input),
-  children(2).eval(input),
-  children(3).eval(input))
+  ScalaReflection.convertToScala(children(0).eval(input), 
children(0).dataType),
+  ScalaReflection.convertToScala(children(1).eval(input), 
children(1).dataType),
+  ScalaReflection.convertToScala(children(2).eval(input), 
children(2).dataType),
+  ScalaReflection.convertToScala(children(3).eval(input), 
children(3).dataType))
+
+
   case 5 =
 function.asInstanceOf[(Any, Any, Any, Any, Any) = Any](
-  children(0).eval(input),
-  children(1).eval(input),
-  children(2).eval(input),
-  children(3).eval(input),
-  children(4).eval(input))
+  ScalaReflection.convertToScala(children(0).eval(input), 
children(0).dataType),
+  ScalaReflection.convertToScala(children(1).eval(input), 
children(1).dataType),
+  ScalaReflection.convertToScala(children(2).eval(input), 
children(2).dataType),
+  

git commit: [SPARK-4168][WebUI] web statges number should show correctly when stages are more than 1000

2014-11-03 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 15b58a223 - 97a466eca


[SPARK-4168][WebUI] web statges number should show correctly when stages are 
more than 1000

The number of completed stages and failed stages showed on webUI will always be 
less than 1000. This is really misleading when there are already thousands of 
stages completed or failed. The number should be correct even when only partial 
stages listed on the webUI (stage info will be removed if the number is too 
large).

Author: Zhang, Liye liye.zh...@intel.com

Closes #3035 from liyezhang556520/webStageNum and squashes the following 
commits:

d9e29fb [Zhang, Liye] add detailed comments for variables
4ea8fd1 [Zhang, Liye] change variable name accroding to comments
f4c404d [Zhang, Liye] [SPARK-4168][WebUI] web statges number should show 
correctly when stages are more than 1000


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97a466ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97a466ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97a466ec

Branch: refs/heads/master
Commit: 97a466eca0a629f17e9662ca2b59eeca99142c54
Parents: 15b58a2
Author: Zhang, Liye liye.zh...@intel.com
Authored: Mon Nov 3 18:17:32 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Mon Nov 3 18:17:32 2014 -0800

--
 .../org/apache/spark/ui/jobs/JobProgressListener.scala|  9 +
 .../scala/org/apache/spark/ui/jobs/JobProgressPage.scala  | 10 ++
 2 files changed, 15 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/97a466ec/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index b520736..e322340 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -59,6 +59,13 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   val failedStages = ListBuffer[StageInfo]()
   val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
   val stageIdToInfo = new HashMap[StageId, StageInfo]
+  
+  // Number of completed and failed stages, may not actually equal to 
completedStages.size and 
+  // failedStages.size respectively due to completedStage and failedStages 
only maintain the latest
+  // part of the stages, the earlier ones will be removed when there are too 
many stages for 
+  // memory sake.
+  var numCompletedStages = 0
+  var numFailedStages = 0
 
   // Map from pool name to a hash map (map from stage id to StageInfo).
   val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
@@ -110,9 +117,11 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
 activeStages.remove(stage.stageId)
 if (stage.failureReason.isEmpty) {
   completedStages += stage
+  numCompletedStages += 1
   trimIfNecessary(completedStages)
 } else {
   failedStages += stage
+  numFailedStages += 1
   trimIfNecessary(failedStages)
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/97a466ec/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
index 6e718ee..83a7898 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
@@ -34,7 +34,9 @@ private[ui] class JobProgressPage(parent: JobProgressTab) 
extends WebUIPage()
 listener.synchronized {
   val activeStages = listener.activeStages.values.toSeq
   val completedStages = listener.completedStages.reverse.toSeq
+  val numCompletedStages = listener.numCompletedStages
   val failedStages = listener.failedStages.reverse.toSeq
+  val numFailedStages = listener.numFailedStages
   val now = System.currentTimeMillis
 
   val activeStagesTable =
@@ -69,11 +71,11 @@ private[ui] class JobProgressPage(parent: JobProgressTab) 
extends WebUIPage()
 /li
 li
   a href=#completedstrongCompleted Stages:/strong/a
-  {completedStages.size}
+  {numCompletedStages}
 /li
  li
  a href=#failedstrongFailed Stages:/strong/a
-  {failedStages.size}
+  {numFailedStages}
 /li
   /ul
 /div
@@ -86,9 +88,9 

git commit: [SPARK-611] Display executor thread dumps in web UI

2014-11-03 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 97a466eca - 4f035dd2c


[SPARK-611] Display executor thread dumps in web UI

This patch allows executor thread dumps to be collected on-demand and viewed in 
the Spark web UI.

The thread dumps are collected using Thread.getAllStackTraces().  To allow 
remote thread dumps to be triggered from the web UI, I added a new 
`ExecutorActor` that runs inside of the Executor actor system and responds to 
RPCs from the driver.  The driver's mechanism for obtaining a reference to this 
actor is a little bit hacky: it uses the block manager master actor to 
determine the host/port of the executor actor systems in order to construct 
ActorRefs to ExecutorActor.  Unfortunately, I couldn't find a much cleaner way 
to do this without a big refactoring of the executor - driver communication.

Screenshots:

![image](https://cloud.githubusercontent.com/assets/50748/4781793/7e7a0776-5cbf-11e4-874d-a91cd04620bd.png)

![image](https://cloud.githubusercontent.com/assets/50748/4781794/8bce76aa-5cbf-11e4-8d13-8477748c9f7e.png)

![image](https://cloud.githubusercontent.com/assets/50748/4781797/bd11a8b8-5cbf-11e4-9ad7-a7459467ec8e.png)

Author: Josh Rosen joshro...@databricks.com

Closes #2944 from JoshRosen/jstack-in-web-ui and squashes the following commits:

3c21a5d [Josh Rosen] Address review comments:
880f7f7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into 
jstack-in-web-ui
f719266 [Josh Rosen] Merge remote-tracking branch 'origin/master' into 
jstack-in-web-ui
19707b0 [Josh Rosen] Add one comment.
127a130 [Josh Rosen] Update to use SparkContext.DRIVER_IDENTIFIER
b8e69aa [Josh Rosen] Merge remote-tracking branch 'origin/master' into 
jstack-in-web-ui
3dfc2d4 [Josh Rosen] Add missing file.
bc1e675 [Josh Rosen] Undo some leftover changes from the earlier approach.
f4ac1c1 [Josh Rosen] Switch to on-demand collection of thread dumps
dfec08b [Josh Rosen] Add option to disable thread dumps in UI.
4c87d7f [Josh Rosen] Use separate RPC for sending thread dumps.
2b8bdf3 [Josh Rosen] Enable thread dumps from the driver when running in 
non-local mode.
cc3e6b3 [Josh Rosen] Fix test code in DAGSchedulerSuite.
87b8b65 [Josh Rosen] Add new listener event for thread dumps.
8c10216 [Josh Rosen] Add missing file.
0f198ac [Josh Rosen] [SPARK-611] Display executor thread dumps in web UI


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f035dd2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f035dd2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f035dd2

Branch: refs/heads/master
Commit: 4f035dd2cd6f1ec9059811f3495f3e0a8ec5fb84
Parents: 97a466e
Author: Josh Rosen joshro...@databricks.com
Authored: Mon Nov 3 18:18:47 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Mon Nov 3 18:18:47 2014 -0800

--
 .../scala/org/apache/spark/SparkContext.scala   | 29 +++-
 .../executor/CoarseGrainedExecutorBackend.scala |  3 +-
 .../org/apache/spark/executor/Executor.scala|  7 +-
 .../apache/spark/executor/ExecutorActor.scala   | 41 +++
 .../spark/storage/BlockManagerMaster.scala  |  4 ++
 .../spark/storage/BlockManagerMasterActor.scala | 18 +
 .../spark/storage/BlockManagerMessages.scala|  2 +
 .../spark/ui/exec/ExecutorThreadDumpPage.scala  | 73 
 .../apache/spark/ui/exec/ExecutorsPage.scala| 15 +++-
 .../org/apache/spark/ui/exec/ExecutorsTab.scala |  8 ++-
 .../scala/org/apache/spark/util/AkkaUtils.scala | 14 
 .../apache/spark/util/ThreadStackTrace.scala| 27 
 .../scala/org/apache/spark/util/Utils.scala | 13 
 13 files changed, 247 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8b4db78..40444c2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -21,9 +21,8 @@ import scala.language.implicitConversions
 
 import java.io._
 import java.net.URI
-import java.util.Arrays
+import java.util.{Arrays, Properties, UUID}
 import java.util.concurrent.atomic.AtomicInteger
-import java.util.{Properties, UUID}
 import java.util.UUID.randomUUID
 import scala.collection.{Map, Set}
 import scala.collection.generic.Growable
@@ -41,6 +40,7 @@ import akka.actor.Props
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
+import org.apache.spark.executor.TriggerThreadDump
 import 

git commit: [FIX][MLLIB] fix seed in BaggedPointSuite

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 4f035dd2c - c5912ecc7


[FIX][MLLIB] fix seed in BaggedPointSuite

Saw Jenkins test failures due to random seeds.

jkbradley manishamde

Author: Xiangrui Meng m...@databricks.com

Closes #3084 from mengxr/fix-baggedpoint-suite and squashes the following 
commits:

f735a43 [Xiangrui Meng] fix seed in BaggedPointSuite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5912ecc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5912ecc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5912ecc

Branch: refs/heads/master
Commit: c5912ecc7b392a13089ae735c07c2d7256de36c6
Parents: 4f035dd
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 18:50:37 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 18:50:37 2014 -0800

--
 .../apache/spark/mllib/tree/impl/BaggedPointSuite.scala   | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c5912ecc/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala
index c0a62e0..5cb4332 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala
@@ -30,7 +30,7 @@ class BaggedPointSuite extends FunSuite with 
LocalSparkContext  {
   test(BaggedPoint RDD: without subsampling) {
 val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000)
 val rdd = sc.parallelize(arr)
-val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, 1, false)
+val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, 1, false, 42)
 baggedRDD.collect().foreach { baggedPoint =
   assert(baggedPoint.subsampleWeights.size == 1  
baggedPoint.subsampleWeights(0) == 1)
 }
@@ -44,7 +44,7 @@ class BaggedPointSuite extends FunSuite with 
LocalSparkContext  {
 val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000)
 val rdd = sc.parallelize(arr)
 seeds.foreach { seed =
-  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, 
true)
+  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, 
true, seed)
   val subsampleCounts: Array[Array[Double]] = 
baggedRDD.map(_.subsampleWeights).collect()
   EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, 
expectedMean,
 expectedStddev, epsilon = 0.01)
@@ -60,7 +60,7 @@ class BaggedPointSuite extends FunSuite with 
LocalSparkContext  {
 val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000)
 val rdd = sc.parallelize(arr)
 seeds.foreach { seed =
-  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, 
numSubsamples, true)
+  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, 
numSubsamples, true, seed)
   val subsampleCounts: Array[Array[Double]] = 
baggedRDD.map(_.subsampleWeights).collect()
   EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, 
expectedMean,
 expectedStddev, epsilon = 0.01)
@@ -75,7 +75,7 @@ class BaggedPointSuite extends FunSuite with 
LocalSparkContext  {
 val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000)
 val rdd = sc.parallelize(arr)
 seeds.foreach { seed =
-  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, 
false)
+  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, 
false, seed)
   val subsampleCounts: Array[Array[Double]] = 
baggedRDD.map(_.subsampleWeights).collect()
   EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, 
expectedMean,
 expectedStddev, epsilon = 0.01)
@@ -91,7 +91,7 @@ class BaggedPointSuite extends FunSuite with 
LocalSparkContext  {
 val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000)
 val rdd = sc.parallelize(arr)
 seeds.foreach { seed =
-  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, 
numSubsamples, false)
+  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, 
numSubsamples, false, seed)
   val subsampleCounts: Array[Array[Double]] = 
baggedRDD.map(_.subsampleWeights).collect()
   EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, 
expectedMean,
 expectedStddev, epsilon = 0.01)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: 

git commit: [FIX][MLLIB] fix seed in BaggedPointSuite

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 52db2b942 - 0826eed9c


[FIX][MLLIB] fix seed in BaggedPointSuite

Saw Jenkins test failures due to random seeds.

jkbradley manishamde

Author: Xiangrui Meng m...@databricks.com

Closes #3084 from mengxr/fix-baggedpoint-suite and squashes the following 
commits:

f735a43 [Xiangrui Meng] fix seed in BaggedPointSuite

(cherry picked from commit c5912ecc7b392a13089ae735c07c2d7256de36c6)
Signed-off-by: Xiangrui Meng m...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0826eed9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0826eed9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0826eed9

Branch: refs/heads/branch-1.2
Commit: 0826eed9c84a73544e3d8289834c8b5ebac47e03
Parents: 52db2b9
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 18:50:37 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 18:50:51 2014 -0800

--
 .../apache/spark/mllib/tree/impl/BaggedPointSuite.scala   | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0826eed9/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala
index c0a62e0..5cb4332 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/tree/impl/BaggedPointSuite.scala
@@ -30,7 +30,7 @@ class BaggedPointSuite extends FunSuite with 
LocalSparkContext  {
   test(BaggedPoint RDD: without subsampling) {
 val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000)
 val rdd = sc.parallelize(arr)
-val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, 1, false)
+val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, 1, false, 42)
 baggedRDD.collect().foreach { baggedPoint =
   assert(baggedPoint.subsampleWeights.size == 1  
baggedPoint.subsampleWeights(0) == 1)
 }
@@ -44,7 +44,7 @@ class BaggedPointSuite extends FunSuite with 
LocalSparkContext  {
 val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000)
 val rdd = sc.parallelize(arr)
 seeds.foreach { seed =
-  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, 
true)
+  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, 
true, seed)
   val subsampleCounts: Array[Array[Double]] = 
baggedRDD.map(_.subsampleWeights).collect()
   EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, 
expectedMean,
 expectedStddev, epsilon = 0.01)
@@ -60,7 +60,7 @@ class BaggedPointSuite extends FunSuite with 
LocalSparkContext  {
 val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000)
 val rdd = sc.parallelize(arr)
 seeds.foreach { seed =
-  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, 
numSubsamples, true)
+  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, 
numSubsamples, true, seed)
   val subsampleCounts: Array[Array[Double]] = 
baggedRDD.map(_.subsampleWeights).collect()
   EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, 
expectedMean,
 expectedStddev, epsilon = 0.01)
@@ -75,7 +75,7 @@ class BaggedPointSuite extends FunSuite with 
LocalSparkContext  {
 val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000)
 val rdd = sc.parallelize(arr)
 seeds.foreach { seed =
-  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, 
false)
+  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, 
false, seed)
   val subsampleCounts: Array[Array[Double]] = 
baggedRDD.map(_.subsampleWeights).collect()
   EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, 
expectedMean,
 expectedStddev, epsilon = 0.01)
@@ -91,7 +91,7 @@ class BaggedPointSuite extends FunSuite with 
LocalSparkContext  {
 val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000)
 val rdd = sc.parallelize(arr)
 seeds.foreach { seed =
-  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, 
numSubsamples, false)
+  val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, 
numSubsamples, false, seed)
   val subsampleCounts: Array[Array[Double]] = 
baggedRDD.map(_.subsampleWeights).collect()
   EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, 
expectedMean,
 expectedStddev, epsilon = 0.01)



git commit: [SPARK-4192][SQL] Internal API for Python UDT

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master c5912ecc7 - 04450d115


[SPARK-4192][SQL] Internal API for Python UDT

Following #2919, this PR adds Python UDT (for internal use only) with tests 
under pyspark.tests. Before `SQLContext.applySchema`, we check whether we 
need to convert user-type instances into SQL recognizable data. In the current 
implementation, a Python UDT must be paired with a Scala UDT for serialization 
on the JVM side. A following PR will add VectorUDT in MLlib for both Scala and 
Python.

marmbrus jkbradley davies

Author: Xiangrui Meng m...@databricks.com

Closes #3068 from mengxr/SPARK-4192-sql and squashes the following commits:

acff637 [Xiangrui Meng] merge master
dba5ea7 [Xiangrui Meng] only use pyClass for Python UDT output sqlType as well
2c9d7e4 [Xiangrui Meng] move import to global setup; update needsConversion
7c4a6a9 [Xiangrui Meng] address comments
75223db [Xiangrui Meng] minor update
f740379 [Xiangrui Meng] remove UDT from default imports
e98d9d0 [Xiangrui Meng] fix py style
4e84fce [Xiangrui Meng] remove local hive tests and add more tests
39f19e0 [Xiangrui Meng] add tests
b7f666d [Xiangrui Meng] add Python UDT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04450d11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04450d11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04450d11

Branch: refs/heads/master
Commit: 04450d11548cfb25d4fb77d4a33e3a7cd4254183
Parents: c5912ec
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 19:29:11 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 19:29:11 2014 -0800

--
 python/pyspark/sql.py   | 206 ++-
 python/pyspark/tests.py |  93 -
 .../spark/sql/catalyst/types/dataTypes.scala|   9 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   2 +
 .../apache/spark/sql/execution/pythonUdfs.scala |   5 +
 .../apache/spark/sql/test/ExamplePointUDT.scala |  64 ++
 .../sql/types/util/DataTypeConversions.scala|   1 -
 7 files changed, 375 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/04450d11/python/pyspark/sql.py
--
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 675df08..d16c18b 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -417,6 +417,75 @@ class StructType(DataType):
 return StructType([StructField.fromJson(f) for f in json[fields]])
 
 
+class UserDefinedType(DataType):
+
+:: WARN: Spark Internal Use Only ::
+SQL User-Defined Type (UDT).
+
+
+@classmethod
+def typeName(cls):
+return cls.__name__.lower()
+
+@classmethod
+def sqlType(cls):
+
+Underlying SQL storage type for this UDT.
+
+raise NotImplementedError(UDT must implement sqlType().)
+
+@classmethod
+def module(cls):
+
+The Python module of the UDT.
+
+raise NotImplementedError(UDT must implement module().)
+
+@classmethod
+def scalaUDT(cls):
+
+The class name of the paired Scala UDT.
+
+raise NotImplementedError(UDT must have a paired Scala UDT.)
+
+def serialize(self, obj):
+
+Converts the a user-type object into a SQL datum.
+
+raise NotImplementedError(UDT must implement serialize().)
+
+def deserialize(self, datum):
+
+Converts a SQL datum into a user-type object.
+
+raise NotImplementedError(UDT must implement deserialize().)
+
+def json(self):
+return json.dumps(self.jsonValue(), separators=(',', ':'), 
sort_keys=True)
+
+def jsonValue(self):
+schema = {
+type: udt,
+class: self.scalaUDT(),
+pyClass: %s.%s % (self.module(), type(self).__name__),
+sqlType: self.sqlType().jsonValue()
+}
+return schema
+
+@classmethod
+def fromJson(cls, json):
+pyUDT = json[pyClass]
+split = pyUDT.rfind(.)
+pyModule = pyUDT[:split]
+pyClass = pyUDT[split+1:]
+m = __import__(pyModule, globals(), locals(), [pyClass], -1)
+UDT = getattr(m, pyClass)
+return UDT()
+
+def __eq__(self, other):
+return type(self) == type(other)
+
+
 _all_primitive_types = dict((v.typeName(), v)
 for v in globals().itervalues()
 if type(v) is PrimitiveTypeSingleton and
@@ -469,6 +538,12 @@ def _parse_datatype_json_string(json_string):
 ...   complex_arraytype, False)
  check_datatype(complex_maptype)
 True
+ check_datatype(ExamplePointUDT())
+   

git commit: [SPARK-4192][SQL] Internal API for Python UDT

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 0826eed9c - 42d02db86


[SPARK-4192][SQL] Internal API for Python UDT

Following #2919, this PR adds Python UDT (for internal use only) with tests 
under pyspark.tests. Before `SQLContext.applySchema`, we check whether we 
need to convert user-type instances into SQL recognizable data. In the current 
implementation, a Python UDT must be paired with a Scala UDT for serialization 
on the JVM side. A following PR will add VectorUDT in MLlib for both Scala and 
Python.

marmbrus jkbradley davies

Author: Xiangrui Meng m...@databricks.com

Closes #3068 from mengxr/SPARK-4192-sql and squashes the following commits:

acff637 [Xiangrui Meng] merge master
dba5ea7 [Xiangrui Meng] only use pyClass for Python UDT output sqlType as well
2c9d7e4 [Xiangrui Meng] move import to global setup; update needsConversion
7c4a6a9 [Xiangrui Meng] address comments
75223db [Xiangrui Meng] minor update
f740379 [Xiangrui Meng] remove UDT from default imports
e98d9d0 [Xiangrui Meng] fix py style
4e84fce [Xiangrui Meng] remove local hive tests and add more tests
39f19e0 [Xiangrui Meng] add tests
b7f666d [Xiangrui Meng] add Python UDT

(cherry picked from commit 04450d11548cfb25d4fb77d4a33e3a7cd4254183)
Signed-off-by: Xiangrui Meng m...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42d02db8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42d02db8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42d02db8

Branch: refs/heads/branch-1.2
Commit: 42d02db86cd973cf31ceeede0c5a723238bbe746
Parents: 0826eed
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 19:29:11 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 19:30:32 2014 -0800

--
 python/pyspark/sql.py   | 206 ++-
 python/pyspark/tests.py |  93 -
 .../spark/sql/catalyst/types/dataTypes.scala|   9 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   2 +
 .../apache/spark/sql/execution/pythonUdfs.scala |   5 +
 .../apache/spark/sql/test/ExamplePointUDT.scala |  64 ++
 .../sql/types/util/DataTypeConversions.scala|   1 -
 7 files changed, 375 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/42d02db8/python/pyspark/sql.py
--
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 675df08..d16c18b 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -417,6 +417,75 @@ class StructType(DataType):
 return StructType([StructField.fromJson(f) for f in json[fields]])
 
 
+class UserDefinedType(DataType):
+
+:: WARN: Spark Internal Use Only ::
+SQL User-Defined Type (UDT).
+
+
+@classmethod
+def typeName(cls):
+return cls.__name__.lower()
+
+@classmethod
+def sqlType(cls):
+
+Underlying SQL storage type for this UDT.
+
+raise NotImplementedError(UDT must implement sqlType().)
+
+@classmethod
+def module(cls):
+
+The Python module of the UDT.
+
+raise NotImplementedError(UDT must implement module().)
+
+@classmethod
+def scalaUDT(cls):
+
+The class name of the paired Scala UDT.
+
+raise NotImplementedError(UDT must have a paired Scala UDT.)
+
+def serialize(self, obj):
+
+Converts the a user-type object into a SQL datum.
+
+raise NotImplementedError(UDT must implement serialize().)
+
+def deserialize(self, datum):
+
+Converts a SQL datum into a user-type object.
+
+raise NotImplementedError(UDT must implement deserialize().)
+
+def json(self):
+return json.dumps(self.jsonValue(), separators=(',', ':'), 
sort_keys=True)
+
+def jsonValue(self):
+schema = {
+type: udt,
+class: self.scalaUDT(),
+pyClass: %s.%s % (self.module(), type(self).__name__),
+sqlType: self.sqlType().jsonValue()
+}
+return schema
+
+@classmethod
+def fromJson(cls, json):
+pyUDT = json[pyClass]
+split = pyUDT.rfind(.)
+pyModule = pyUDT[:split]
+pyClass = pyUDT[split+1:]
+m = __import__(pyModule, globals(), locals(), [pyClass], -1)
+UDT = getattr(m, pyClass)
+return UDT()
+
+def __eq__(self, other):
+return type(self) == type(other)
+
+
 _all_primitive_types = dict((v.typeName(), v)
 for v in globals().itervalues()
 if type(v) is PrimitiveTypeSingleton and
@@ -469,6 +538,12 @@ def _parse_datatype_json_string(json_string):
 ...  

git commit: [SPARK-3573][MLLIB] Make MLlib's Vector compatible with SQL's SchemaRDD

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 04450d115 - 1a9c6cdda


[SPARK-3573][MLLIB] Make MLlib's Vector compatible with SQL's SchemaRDD

Register MLlib's Vector as a SQL user-defined type (UDT) in both Scala and 
Python. With this PR, we can easily map a RDD[LabeledPoint] to a SchemaRDD, and 
then select columns or save to a Parquet file. Examples in Scala/Python are 
attached. The Scala code was copied from jkbradley.

~~This PR contains the changes from #3068 . I will rebase after #3068 is 
merged.~~

marmbrus jkbradley

Author: Xiangrui Meng m...@databricks.com

Closes #3070 from mengxr/SPARK-3573 and squashes the following commits:

3a0b6e5 [Xiangrui Meng] organize imports
236f0a0 [Xiangrui Meng] register vector as UDT and provide dataset examples


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a9c6cdd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a9c6cdd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a9c6cdd

Branch: refs/heads/master
Commit: 1a9c6cddadebdc53d083ac3e0da276ce979b5d1f
Parents: 04450d1
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 22:29:48 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 22:29:48 2014 -0800

--
 dev/run-tests   |   2 +-
 .../src/main/python/mllib/dataset_example.py|  62 ++
 .../spark/examples/mllib/DatasetExample.scala   | 121 +++
 mllib/pom.xml   |   5 +
 .../org/apache/spark/mllib/linalg/Vectors.scala |  69 ++-
 .../spark/mllib/linalg/VectorsSuite.scala   |  11 ++
 python/pyspark/mllib/linalg.py  |  50 
 python/pyspark/mllib/tests.py   |  39 +-
 8 files changed, 353 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1a9c6cdd/dev/run-tests
--
diff --git a/dev/run-tests b/dev/run-tests
index 0e9eefa..de607e4 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -180,7 +180,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS
   if [ -n $_SQL_TESTS_ONLY ]; then
 # This must be an array of individual arguments. Otherwise, having one 
long string
 #+ will be interpreted as a single test, which doesn't work.
-SBT_MAVEN_TEST_ARGS=(catalyst/test sql/test hive/test)
+SBT_MAVEN_TEST_ARGS=(catalyst/test sql/test hive/test mllib/test)
   else
 SBT_MAVEN_TEST_ARGS=(test)
   fi

http://git-wip-us.apache.org/repos/asf/spark/blob/1a9c6cdd/examples/src/main/python/mllib/dataset_example.py
--
diff --git a/examples/src/main/python/mllib/dataset_example.py 
b/examples/src/main/python/mllib/dataset_example.py
new file mode 100644
index 000..540dae7
--- /dev/null
+++ b/examples/src/main/python/mllib/dataset_example.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+An example of how to use SchemaRDD as a dataset for ML. Run with::
+bin/spark-submit examples/src/main/python/mllib/dataset_example.py
+
+
+import os
+import sys
+import tempfile
+import shutil
+
+from pyspark import SparkContext
+from pyspark.sql import SQLContext
+from pyspark.mllib.util import MLUtils
+from pyspark.mllib.stat import Statistics
+
+
+def summarize(dataset):
+print schema: %s % dataset.schema().json()
+labels = dataset.map(lambda r: r.label)
+print label average: %f % labels.mean()
+features = dataset.map(lambda r: r.features)
+summary = Statistics.colStats(features)
+print features average: %r % summary.mean()
+
+if __name__ == __main__:
+if len(sys.argv)  2:
+print  sys.stderr, Usage: dataset_example.py libsvm file
+exit(-1)
+sc = SparkContext(appName=DatasetExample)
+sqlCtx = SQLContext(sc)
+if len(sys.argv) == 2:
+input = sys.argv[1]
+else:
+input = data/mllib/sample_libsvm_data.txt
+points = MLUtils.loadLibSVMFile(sc, input)
+dataset0 = 

git commit: [SPARK-3573][MLLIB] Make MLlib's Vector compatible with SQL's SchemaRDD

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 42d02db86 - 8395e8fbd


[SPARK-3573][MLLIB] Make MLlib's Vector compatible with SQL's SchemaRDD

Register MLlib's Vector as a SQL user-defined type (UDT) in both Scala and 
Python. With this PR, we can easily map a RDD[LabeledPoint] to a SchemaRDD, and 
then select columns or save to a Parquet file. Examples in Scala/Python are 
attached. The Scala code was copied from jkbradley.

~~This PR contains the changes from #3068 . I will rebase after #3068 is 
merged.~~

marmbrus jkbradley

Author: Xiangrui Meng m...@databricks.com

Closes #3070 from mengxr/SPARK-3573 and squashes the following commits:

3a0b6e5 [Xiangrui Meng] organize imports
236f0a0 [Xiangrui Meng] register vector as UDT and provide dataset examples

(cherry picked from commit 1a9c6cddadebdc53d083ac3e0da276ce979b5d1f)
Signed-off-by: Xiangrui Meng m...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8395e8fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8395e8fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8395e8fb

Branch: refs/heads/branch-1.2
Commit: 8395e8fbdf23bef286ec68a4bbadcc448b504c2c
Parents: 42d02db
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 22:29:48 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 22:31:43 2014 -0800

--
 dev/run-tests   |   2 +-
 .../src/main/python/mllib/dataset_example.py|  62 ++
 .../spark/examples/mllib/DatasetExample.scala   | 121 +++
 mllib/pom.xml   |   5 +
 .../org/apache/spark/mllib/linalg/Vectors.scala |  69 ++-
 .../spark/mllib/linalg/VectorsSuite.scala   |  11 ++
 python/pyspark/mllib/linalg.py  |  50 
 python/pyspark/mllib/tests.py   |  39 +-
 8 files changed, 353 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8395e8fb/dev/run-tests
--
diff --git a/dev/run-tests b/dev/run-tests
index 0e9eefa..de607e4 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -180,7 +180,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS
   if [ -n $_SQL_TESTS_ONLY ]; then
 # This must be an array of individual arguments. Otherwise, having one 
long string
 #+ will be interpreted as a single test, which doesn't work.
-SBT_MAVEN_TEST_ARGS=(catalyst/test sql/test hive/test)
+SBT_MAVEN_TEST_ARGS=(catalyst/test sql/test hive/test mllib/test)
   else
 SBT_MAVEN_TEST_ARGS=(test)
   fi

http://git-wip-us.apache.org/repos/asf/spark/blob/8395e8fb/examples/src/main/python/mllib/dataset_example.py
--
diff --git a/examples/src/main/python/mllib/dataset_example.py 
b/examples/src/main/python/mllib/dataset_example.py
new file mode 100644
index 000..540dae7
--- /dev/null
+++ b/examples/src/main/python/mllib/dataset_example.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+An example of how to use SchemaRDD as a dataset for ML. Run with::
+bin/spark-submit examples/src/main/python/mllib/dataset_example.py
+
+
+import os
+import sys
+import tempfile
+import shutil
+
+from pyspark import SparkContext
+from pyspark.sql import SQLContext
+from pyspark.mllib.util import MLUtils
+from pyspark.mllib.stat import Statistics
+
+
+def summarize(dataset):
+print schema: %s % dataset.schema().json()
+labels = dataset.map(lambda r: r.label)
+print label average: %f % labels.mean()
+features = dataset.map(lambda r: r.features)
+summary = Statistics.colStats(features)
+print features average: %r % summary.mean()
+
+if __name__ == __main__:
+if len(sys.argv)  2:
+print  sys.stderr, Usage: dataset_example.py libsvm file
+exit(-1)
+sc = SparkContext(appName=DatasetExample)
+sqlCtx = SQLContext(sc)
+if len(sys.argv) == 2:
+input = sys.argv[1]
+else:
+

git commit: [SPARK-4163][Core] Add a backward compatibility test for FetchFailed

2014-11-03 Thread adav
Repository: spark
Updated Branches:
  refs/heads/master 1a9c6cdda - 9bdc8412a


[SPARK-4163][Core] Add a backward compatibility test for FetchFailed

/cc aarondav

Author: zsxwing zsxw...@gmail.com

Closes #3086 from zsxwing/SPARK-4163-back-comp and squashes the following 
commits:

21cb2a8 [zsxwing] Add a backward compatibility test for FetchFailed


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bdc8412
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bdc8412
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bdc8412

Branch: refs/heads/master
Commit: 9bdc8412a0160e06e8182bd8b2f9bb65b478c590
Parents: 1a9c6cd
Author: zsxwing zsxw...@gmail.com
Authored: Mon Nov 3 22:40:43 2014 -0800
Committer: Aaron Davidson aa...@databricks.com
Committed: Mon Nov 3 22:40:43 2014 -0800

--
 .../scala/org/apache/spark/util/JsonProtocolSuite.scala  | 11 +++
 1 file changed, 11 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9bdc8412/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a91c9dd..0103012 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -177,6 +177,17 @@ class JsonProtocolSuite extends FunSuite {
   deserializedBmRemoved)
   }
 
+  test(FetchFailed backwards compatibility) {
+// FetchFailed in Spark 1.1.0 does not have an Message property.
+val fetchFailed = FetchFailed(BlockManagerId(With or, without you, 
15), 17, 18, 19,
+  ignored)
+val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed)
+  .removeField({ _._1 == Message })
+val expectedFetchFailed = FetchFailed(BlockManagerId(With or, without 
you, 15), 17, 18, 19,
+  Unknown reason)
+assert(expectedFetchFailed === 
JsonProtocol.taskEndReasonFromJson(oldEvent))
+  }
+
   test(SparkListenerApplicationStart backwards compatibility) {
 // SparkListenerApplicationStart in Spark 1.0.0 do not have an appId 
property.
 val applicationStart = SparkListenerApplicationStart(test, None, 1L, 
user)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4166][Core] Add a backward compatibility test for ExecutorLostFailure

2014-11-03 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 9bdc8412a - b671ce047


[SPARK-4166][Core] Add a backward compatibility test for ExecutorLostFailure

Author: zsxwing zsxw...@gmail.com

Closes #3085 from zsxwing/SPARK-4166-back-comp and squashes the following 
commits:

89329f4 [zsxwing] Add a backward compatibility test for ExecutorLostFailure


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b671ce04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b671ce04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b671ce04

Branch: refs/heads/master
Commit: b671ce047d036b8923007902826038b01e836e8a
Parents: 9bdc841
Author: zsxwing zsxw...@gmail.com
Authored: Mon Nov 3 22:47:45 2014 -0800
Committer: Andrew Or and...@databricks.com
Committed: Mon Nov 3 22:47:45 2014 -0800

--
 .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 9 +
 1 file changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b671ce04/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 0103012..aec1e40 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -196,6 +196,15 @@ class JsonProtocolSuite extends FunSuite {
 assert(applicationStart === 
JsonProtocol.applicationStartFromJson(oldEvent))
   }
 
+  test(ExecutorLostFailure backward compatibility) {
+// ExecutorLostFailure in Spark 1.1.0 does not have an Executor ID 
property.
+val executorLostFailure = ExecutorLostFailure(100)
+val oldEvent = JsonProtocol.taskEndReasonToJson(executorLostFailure)
+  .removeField({ _._1 == Executor ID })
+val expectedExecutorLostFailure = ExecutorLostFailure(Unknown)
+assert(expectedExecutorLostFailure === 
JsonProtocol.taskEndReasonFromJson(oldEvent))
+  }
+
   /** -- *
| Helper test running methods |
* --- */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.

2014-11-03 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master b671ce047 - e4f42631a


[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by 
default.

This PR simplify serializer, always use batched serializer 
(AutoBatchedSerializer as default), even batch size is 1.

Author: Davies Liu dav...@databricks.com

This patch had conflicts when merged, resolved by
Committer: Josh Rosen joshro...@databricks.com

Closes #2920 from davies/fix_autobatch and squashes the following commits:

e544ef9 [Davies Liu] revert unrelated change
6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
fix_autobatch
1d557fc [Davies Liu] fix tests
8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
fix_autobatch
76abdce [Davies Liu] clean up
53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
fix_autobatch
d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
fix_autobatch
2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
fix_autobatch
b4292ce [Davies Liu] fix bug in master
d79744c [Davies Liu] recover hive tests
be37ece [Davies Liu] refactor
eb3938d [Davies Liu] refactor serializer in scala
8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4f42631
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4f42631
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4f42631

Branch: refs/heads/master
Commit: e4f42631a68b473ce706429915f3f08042af2119
Parents: b671ce0
Author: Davies Liu dav...@databricks.com
Authored: Mon Nov 3 23:56:14 2014 -0800
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Nov 3 23:56:14 2014 -0800

--
 .../spark/api/python/PythonHadoopUtil.scala |   6 +-
 .../org/apache/spark/api/python/PythonRDD.scala | 110 +
 .../org/apache/spark/api/python/SerDeUtil.scala | 121 ++-
 .../WriteInputFormatTestDataGenerator.scala |  10 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala |   2 +-
 python/pyspark/context.py   |  58 +++--
 python/pyspark/mllib/common.py  |   2 +-
 python/pyspark/mllib/recommendation.py  |   2 +-
 python/pyspark/rdd.py   |  91 ++
 python/pyspark/serializers.py   |  36 ++
 python/pyspark/shuffle.py   |   7 +-
 python/pyspark/sql.py   |  18 ++-
 python/pyspark/tests.py |  66 ++
 .../scala/org/apache/spark/sql/SchemaRDD.scala  |  10 +-
 14 files changed, 201 insertions(+), 338 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 49dc95f..5ba6617 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -61,8 +61,7 @@ private[python] object Converter extends Logging {
  * Other objects are passed through without conversion.
  */
 private[python] class WritableToJavaConverter(
-conf: Broadcast[SerializableWritable[Configuration]],
-batchSize: Int) extends Converter[Any, Any] {
+conf: Broadcast[SerializableWritable[Configuration]]) extends 
Converter[Any, Any] {
 
   /**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, 
String or
@@ -94,8 +93,7 @@ private[python] class WritableToJavaConverter(
   map.put(convertWritable(k), convertWritable(v))
 }
 map
-  case w: Writable =
-if (batchSize  1) WritableUtils.clone(w, conf.value.value) else w
+  case w: Writable = WritableUtils.clone(w, conf.value.value)
   case other = other
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f42631/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 61b125e..e94ccdc 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -22,12 +22,10 @@ import java.net._
 import java.util.{List = JList, ArrayList = JArrayList, Map = JMap, 
Collections}
 
 import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
 import 

git commit: [SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.

2014-11-03 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 8395e8fbd - 786e75b33


[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by 
default.

This PR simplify serializer, always use batched serializer 
(AutoBatchedSerializer as default), even batch size is 1.

Author: Davies Liu dav...@databricks.com

This patch had conflicts when merged, resolved by
Committer: Josh Rosen joshro...@databricks.com

Closes #2920 from davies/fix_autobatch and squashes the following commits:

e544ef9 [Davies Liu] revert unrelated change
6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
fix_autobatch
1d557fc [Davies Liu] fix tests
8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
fix_autobatch
76abdce [Davies Liu] clean up
53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
fix_autobatch
d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
fix_autobatch
2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
fix_autobatch
b4292ce [Davies Liu] fix bug in master
d79744c [Davies Liu] recover hive tests
be37ece [Davies Liu] refactor
eb3938d [Davies Liu] refactor serializer in scala
8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.

(cherry picked from commit e4f42631a68b473ce706429915f3f08042af2119)
Signed-off-by: Josh Rosen joshro...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/786e75b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/786e75b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/786e75b3

Branch: refs/heads/branch-1.2
Commit: 786e75b33f0bc1445bfc289fe4b62407cb79026e
Parents: 8395e8f
Author: Davies Liu dav...@databricks.com
Authored: Mon Nov 3 23:56:14 2014 -0800
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Nov 3 23:56:37 2014 -0800

--
 .../spark/api/python/PythonHadoopUtil.scala |   6 +-
 .../org/apache/spark/api/python/PythonRDD.scala | 110 +
 .../org/apache/spark/api/python/SerDeUtil.scala | 121 ++-
 .../WriteInputFormatTestDataGenerator.scala |  10 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala |   2 +-
 python/pyspark/context.py   |  58 +++--
 python/pyspark/mllib/common.py  |   2 +-
 python/pyspark/mllib/recommendation.py  |   2 +-
 python/pyspark/rdd.py   |  91 ++
 python/pyspark/serializers.py   |  36 ++
 python/pyspark/shuffle.py   |   7 +-
 python/pyspark/sql.py   |  18 ++-
 python/pyspark/tests.py |  66 ++
 .../scala/org/apache/spark/sql/SchemaRDD.scala  |  10 +-
 14 files changed, 201 insertions(+), 338 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/786e75b3/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 49dc95f..5ba6617 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -61,8 +61,7 @@ private[python] object Converter extends Logging {
  * Other objects are passed through without conversion.
  */
 private[python] class WritableToJavaConverter(
-conf: Broadcast[SerializableWritable[Configuration]],
-batchSize: Int) extends Converter[Any, Any] {
+conf: Broadcast[SerializableWritable[Configuration]]) extends 
Converter[Any, Any] {
 
   /**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, 
String or
@@ -94,8 +93,7 @@ private[python] class WritableToJavaConverter(
   map.put(convertWritable(k), convertWritable(v))
 }
 map
-  case w: Writable =
-if (batchSize  1) WritableUtils.clone(w, conf.value.value) else w
+  case w: Writable = WritableUtils.clone(w, conf.value.value)
   case other = other
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/786e75b3/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 61b125e..e94ccdc 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -22,12 +22,10 @@ import java.net._
 import java.util.{List = JList, ArrayList =