spark git commit: [SPARK-10168] [STREAMING] Fix the issue that maven publishes wrong artifact jars

2015-08-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 053d94fcf -> 4e0395ddb


[SPARK-10168] [STREAMING] Fix the issue that maven publishes wrong artifact jars

This PR removed the `outputFile` configuration from pom.xml and updated 
`tests.py` to search jars for both sbt build and maven build.

I ran ` mvn -Pkinesis-asl -DskipTests clean install` locally, and verified the 
jars in my local repository were correct. I also checked Python tests for maven 
build, and it passed all tests.

Author: zsxwing 

Closes #8373 from zsxwing/SPARK-10168 and squashes the following commits:

e0b5818 [zsxwing] Fix the sbt build
c697627 [zsxwing] Add the jar pathes to the exception message
be1d8a5 [zsxwing] Fix the issue that maven publishes wrong artifact jars


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e0395dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e0395dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e0395dd

Branch: refs/heads/master
Commit: 4e0395ddb764d092b5b38447af49e196e590e0f0
Parents: 053d94f
Author: zsxwing 
Authored: Mon Aug 24 12:38:01 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 24 12:38:01 2015 -0700

--
 external/flume-assembly/pom.xml |  1 -
 external/kafka-assembly/pom.xml |  1 -
 external/mqtt-assembly/pom.xml  |  1 -
 extras/kinesis-asl-assembly/pom.xml |  1 -
 python/pyspark/streaming/tests.py   | 47 ++--
 5 files changed, 26 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4e0395dd/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index e05e431..561ed4b 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -115,7 +115,6 @@
 maven-shade-plugin
 
   false
-  
${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar
   
 
   *:*

http://git-wip-us.apache.org/repos/asf/spark/blob/4e0395dd/external/kafka-assembly/pom.xml
--
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
index 36342f3..6f4e2a8 100644
--- a/external/kafka-assembly/pom.xml
+++ b/external/kafka-assembly/pom.xml
@@ -142,7 +142,6 @@
   maven-shade-plugin
   
 false
-
${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kafka-assembly-${project.version}.jar
 
   
 *:*

http://git-wip-us.apache.org/repos/asf/spark/blob/4e0395dd/external/mqtt-assembly/pom.xml
--
diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml
index f3e3f93..8412600 100644
--- a/external/mqtt-assembly/pom.xml
+++ b/external/mqtt-assembly/pom.xml
@@ -132,7 +132,6 @@
 maven-shade-plugin
 
   false
-  
${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar
   
 
   *:*

http://git-wip-us.apache.org/repos/asf/spark/blob/4e0395dd/extras/kinesis-asl-assembly/pom.xml
--
diff --git a/extras/kinesis-asl-assembly/pom.xml 
b/extras/kinesis-asl-assembly/pom.xml
index 3ca5386..51af3e6 100644
--- a/extras/kinesis-asl-assembly/pom.xml
+++ b/extras/kinesis-asl-assembly/pom.xml
@@ -137,7 +137,6 @@
   maven-shade-plugin
   
 false
-
${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar
 
   
 *:*

http://git-wip-us.apache.org/repos/asf/spark/blob/4e0395dd/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 510a4f2..cfea95b 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1162,11 +1162,20 @@ class KinesisStreamTests(PySparkStreamingTestCase):
 kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
 
 
+# Search jar in the project dir using the jar name_prefix for both sbt build 
and maven build because
+# the artifact jars are in different directories.
+def search_jar(dir, name_prefix):
+# We should ignore the following jars
+ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", 
"tests.jar")
+jars = (glob.glob(os.path.join(dir, "target/scala-*/" + name_prefix + 
"-*.jar")) +  # sbt build
+glob.glob(os.p

spark git commit: [SPARK-9791] [PACKAGE] Change private class to private class to prevent unnecessary classes from showing up in the docs

2015-08-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 4e0395ddb -> 7478c8b66


[SPARK-9791] [PACKAGE] Change private class to private class to prevent 
unnecessary classes from showing up in the docs

In addition, some random cleanup of import ordering

Author: Tathagata Das 

Closes #8387 from tdas/SPARK-9791 and squashes the following commits:

67f3ee9 [Tathagata Das] Change private class to private[package] class to 
prevent them from showing up in the docs


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7478c8b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7478c8b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7478c8b6

Branch: refs/heads/master
Commit: 7478c8b66d6a2b1179f20c38b49e27e37b0caec3
Parents: 4e0395d
Author: Tathagata Das 
Authored: Mon Aug 24 12:40:09 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 24 12:40:09 2015 -0700

--
 .../spark/streaming/flume/FlumeUtils.scala  |  2 +-
 .../apache/spark/streaming/kafka/Broker.scala   |  6 ++--
 .../spark/streaming/kafka/KafkaTestUtils.scala  | 10 +++---
 .../spark/streaming/kafka/KafkaUtils.scala  | 36 +---
 .../spark/streaming/kafka/OffsetRange.scala |  8 -
 .../apache/spark/streaming/mqtt/MQTTUtils.scala |  6 ++--
 .../spark/streaming/mqtt/MQTTTestUtils.scala|  2 +-
 .../streaming/kinesis/KinesisTestUtils.scala|  2 +-
 .../spark/streaming/util/WriteAheadLog.java |  2 ++
 .../util/WriteAheadLogRecordHandle.java |  2 ++
 .../receiver/ReceivedBlockHandler.scala |  2 +-
 .../streaming/scheduler/ReceiverTracker.scala   |  2 +-
 .../apache/spark/streaming/ui/BatchPage.scala   |  2 +-
 13 files changed, 28 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7478c8b6/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
--
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 095bfb0..a65a9b9 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -247,7 +247,7 @@ object FlumeUtils {
  * This is a helper class that wraps the methods in FlumeUtils into more 
Python-friendly class and
  * function so that it can be easily instantiated and called from Python's 
FlumeUtils.
  */
-private class FlumeUtilsPythonHelper {
+private[flume] class FlumeUtilsPythonHelper {
 
   def createStream(
   jssc: JavaStreamingContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/7478c8b6/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
index 5a74feb..9159051 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
@@ -20,11 +20,9 @@ package org.apache.spark.streaming.kafka
 import org.apache.spark.annotation.Experimental
 
 /**
- * :: Experimental ::
- * Represent the host and port info for a Kafka broker.
- * Differs from the Kafka project's internal kafka.cluster.Broker, which 
contains a server ID
+ * Represents the host and port info for a Kafka broker.
+ * Differs from the Kafka project's internal kafka.cluster.Broker, which 
contains a server ID.
  */
-@Experimental
 final class Broker private(
 /** Broker's hostname */
 val host: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/7478c8b6/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index b608b75..79a9db4 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -20,9 +20,8 @@ package org.apache.spark.streaming.kafka
 import java.io.File
 import java.lang.{Integer => JInt}
 import java.net.InetSocketAddress
-import java.util.{Map => JMap}
-import java.util.Properties
 import java.util.concurrent.TimeoutException
+import java.util.{Map => JMap, Properties}
 
 import scala.annotation.tailrec
 import scala.language.postfixOps
@@

spark git commit: [SPARK-9791] [PACKAGE] Change private class to private class to prevent unnecessary classes from showing up in the docs

2015-08-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 36bc50c8d -> d003373bd


[SPARK-9791] [PACKAGE] Change private class to private class to prevent 
unnecessary classes from showing up in the docs

In addition, some random cleanup of import ordering

Author: Tathagata Das 

Closes #8387 from tdas/SPARK-9791 and squashes the following commits:

67f3ee9 [Tathagata Das] Change private class to private[package] class to 
prevent them from showing up in the docs

(cherry picked from commit 7478c8b66d6a2b1179f20c38b49e27e37b0caec3)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d003373b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d003373b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d003373b

Branch: refs/heads/branch-1.5
Commit: d003373bd8557ed255125940f736e44f8722e8e3
Parents: 36bc50c
Author: Tathagata Das 
Authored: Mon Aug 24 12:40:09 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 24 12:40:23 2015 -0700

--
 .../spark/streaming/flume/FlumeUtils.scala  |  2 +-
 .../apache/spark/streaming/kafka/Broker.scala   |  6 ++--
 .../spark/streaming/kafka/KafkaTestUtils.scala  | 10 +++---
 .../spark/streaming/kafka/KafkaUtils.scala  | 36 +---
 .../spark/streaming/kafka/OffsetRange.scala |  8 -
 .../apache/spark/streaming/mqtt/MQTTUtils.scala |  6 ++--
 .../spark/streaming/mqtt/MQTTTestUtils.scala|  2 +-
 .../streaming/kinesis/KinesisTestUtils.scala|  2 +-
 .../spark/streaming/util/WriteAheadLog.java |  2 ++
 .../util/WriteAheadLogRecordHandle.java |  2 ++
 .../receiver/ReceivedBlockHandler.scala |  2 +-
 .../streaming/scheduler/ReceiverTracker.scala   |  2 +-
 .../apache/spark/streaming/ui/BatchPage.scala   |  2 +-
 13 files changed, 28 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d003373b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
--
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 095bfb0..a65a9b9 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -247,7 +247,7 @@ object FlumeUtils {
  * This is a helper class that wraps the methods in FlumeUtils into more 
Python-friendly class and
  * function so that it can be easily instantiated and called from Python's 
FlumeUtils.
  */
-private class FlumeUtilsPythonHelper {
+private[flume] class FlumeUtilsPythonHelper {
 
   def createStream(
   jssc: JavaStreamingContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/d003373b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
index 5a74feb..9159051 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
@@ -20,11 +20,9 @@ package org.apache.spark.streaming.kafka
 import org.apache.spark.annotation.Experimental
 
 /**
- * :: Experimental ::
- * Represent the host and port info for a Kafka broker.
- * Differs from the Kafka project's internal kafka.cluster.Broker, which 
contains a server ID
+ * Represents the host and port info for a Kafka broker.
+ * Differs from the Kafka project's internal kafka.cluster.Broker, which 
contains a server ID.
  */
-@Experimental
 final class Broker private(
 /** Broker's hostname */
 val host: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/d003373b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index b608b75..79a9db4 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -20,9 +20,8 @@ package org.apache.spark.streaming.kafka
 import java.io.File
 import java.lang.{Integer => JInt}
 import java.net.InetSocketAddress
-import java.util.{Map => JMap}
-import java.util.Properties
 import java.util.concurrent.TimeoutException
+import

spark git commit: [SPARK-10168] [STREAMING] Fix the issue that maven publishes wrong artifact jars

2015-08-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 b40059dbd -> 36bc50c8d


[SPARK-10168] [STREAMING] Fix the issue that maven publishes wrong artifact jars

This PR removed the `outputFile` configuration from pom.xml and updated 
`tests.py` to search jars for both sbt build and maven build.

I ran ` mvn -Pkinesis-asl -DskipTests clean install` locally, and verified the 
jars in my local repository were correct. I also checked Python tests for maven 
build, and it passed all tests.

Author: zsxwing 

Closes #8373 from zsxwing/SPARK-10168 and squashes the following commits:

e0b5818 [zsxwing] Fix the sbt build
c697627 [zsxwing] Add the jar pathes to the exception message
be1d8a5 [zsxwing] Fix the issue that maven publishes wrong artifact jars

(cherry picked from commit 4e0395ddb764d092b5b38447af49e196e590e0f0)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36bc50c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36bc50c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36bc50c8

Branch: refs/heads/branch-1.5
Commit: 36bc50c8d377f3e628f7d608d58a76ea508e9697
Parents: b40059d
Author: zsxwing 
Authored: Mon Aug 24 12:38:01 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 24 12:38:10 2015 -0700

--
 external/flume-assembly/pom.xml |  1 -
 external/kafka-assembly/pom.xml |  1 -
 external/mqtt-assembly/pom.xml  |  1 -
 extras/kinesis-asl-assembly/pom.xml |  1 -
 python/pyspark/streaming/tests.py   | 47 ++--
 5 files changed, 26 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/36bc50c8/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index e05e431..561ed4b 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -115,7 +115,6 @@
 maven-shade-plugin
 
   false
-  
${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar
   
 
   *:*

http://git-wip-us.apache.org/repos/asf/spark/blob/36bc50c8/external/kafka-assembly/pom.xml
--
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
index 36342f3..6f4e2a8 100644
--- a/external/kafka-assembly/pom.xml
+++ b/external/kafka-assembly/pom.xml
@@ -142,7 +142,6 @@
   maven-shade-plugin
   
 false
-
${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kafka-assembly-${project.version}.jar
 
   
 *:*

http://git-wip-us.apache.org/repos/asf/spark/blob/36bc50c8/external/mqtt-assembly/pom.xml
--
diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml
index f3e3f93..8412600 100644
--- a/external/mqtt-assembly/pom.xml
+++ b/external/mqtt-assembly/pom.xml
@@ -132,7 +132,6 @@
 maven-shade-plugin
 
   false
-  
${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar
   
 
   *:*

http://git-wip-us.apache.org/repos/asf/spark/blob/36bc50c8/extras/kinesis-asl-assembly/pom.xml
--
diff --git a/extras/kinesis-asl-assembly/pom.xml 
b/extras/kinesis-asl-assembly/pom.xml
index 3ca5386..51af3e6 100644
--- a/extras/kinesis-asl-assembly/pom.xml
+++ b/extras/kinesis-asl-assembly/pom.xml
@@ -137,7 +137,6 @@
   maven-shade-plugin
   
 false
-
${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar
 
   
 *:*

http://git-wip-us.apache.org/repos/asf/spark/blob/36bc50c8/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 510a4f2..cfea95b 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1162,11 +1162,20 @@ class KinesisStreamTests(PySparkStreamingTestCase):
 kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
 
 
+# Search jar in the project dir using the jar name_prefix for both sbt build 
and maven build because
+# the artifact jars are in different directories.
+def search_jar(dir, name_prefix):
+# We should ignore the following jars
+ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", 
"tests.jar")
+jars = (glob.glo

spark git commit: [SPARK-10169] [SQL] [BRANCH-1.3] Partial aggregation's plan is wrong when a grouping expression is used as an argument of the aggregate fucntion

2015-08-24 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 a98603f8c -> 3d2eaf0a7


[SPARK-10169] [SQL] [BRANCH-1.3] Partial aggregation's plan is wrong when a 
grouping expression is used as an argument of the aggregate fucntion

https://issues.apache.org/jira/browse/SPARK-10169

Author: Wenchen Fan 
Author: Yin Huai 

Closes #8380 from yhuai/aggTransformDown-branch1.3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d2eaf0a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d2eaf0a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d2eaf0a

Branch: refs/heads/branch-1.3
Commit: 3d2eaf0a7701bfd9a41ba4c1b29e5be77156a9bf
Parents: a98603f
Author: Wenchen Fan 
Authored: Mon Aug 24 13:00:49 2015 -0700
Committer: Michael Armbrust 
Committed: Mon Aug 24 13:00:49 2015 -0700

--
 .../spark/sql/catalyst/planning/patterns.scala  | 14 +++--
 .../org/apache/spark/sql/SQLQuerySuite.scala| 22 
 2 files changed, 34 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3d2eaf0a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 9c8c643..d0ebe24 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -151,7 +151,10 @@ object PartialAggregation {
 
 // Replace aggregations with a new expression that computes the result 
from the already
 // computed partial evaluations and grouping values.
-val rewrittenAggregateExpressions = 
aggregateExpressions.map(_.transformUp {
+// transformDown is needed at here because we want to match aggregate 
function first.
+// Otherwise, if a grouping expression is used as an argument of an 
aggregate function,
+// we will match grouping expression first and have a wrong plan.
+val rewrittenAggregateExpressions = 
aggregateExpressions.map(_.transformDown {
   case e: Expression if partialEvaluations.contains(new 
TreeNodeRef(e)) =>
 partialEvaluations(new TreeNodeRef(e)).finalEvaluation
 
@@ -159,8 +162,15 @@ object PartialAggregation {
 // Should trim aliases around `GetField`s. These aliases are 
introduced while
 // resolving struct field accesses, because `GetField` is not a 
`NamedExpression`.
 // (Should we just turn `GetField` into a `NamedExpression`?)
+def trimAliases(e: Expression): Expression =
+  e.transform { case Alias(g: GetField, _) => g }
+val trimmed = e match {
+  // Don't trim the top level Alias.
+  case Alias(child, name) => Alias(trimAliases(child), name)()
+  case _ => trimAliases(e)
+}
 namedGroupingExpressions
-  .get(e.transform { case Alias(g: GetField, _) => g })
+  .get(trimmed)
   .map(_.toAttribute)
   .getOrElse(e)
 }).asInstanceOf[Seq[NamedExpression]]

http://git-wip-us.apache.org/repos/asf/spark/blob/3d2eaf0a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 87e7cf8..b52b606 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1099,4 +1099,26 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll {
 checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY c0.a"), Row(1))
 checkAnswer(sql("SELECT b[0].a FROM t ORDER BY c0.a"), Row(1))
   }
+
+  test("SPARK-10169: grouping expressions used as arguments of aggregate 
functions.") {
+sqlCtx.sparkContext
+  .parallelize((1 to 1000), 50)
+  .map(i => Tuple1(i))
+  .toDF("i")
+  .registerTempTable("t")
+
+val query = sqlCtx.sql(
+  """
+|select i % 10, sum(if(i % 10 = 5, 1, 0)), count(i)
+|from t
+|where i % 10 = 5
+|group by i % 10
+  """.stripMargin)
+
+checkAnswer(
+  query,
+  Row(5, 100, 100))
+
+dropTempTable("t")
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10169] [SQL] [BRANCH-1.4] Partial aggregation's plan is wrong when a grouping expression is used as an argument of the aggregate fucntion

2015-08-24 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 c73498773 -> 2671551a9


[SPARK-10169] [SQL] [BRANCH-1.4] Partial aggregation's plan is wrong when a 
grouping expression is used as an argument of the aggregate fucntion

https://issues.apache.org/jira/browse/SPARK-10169

Author: Yin Huai 
Author: Wenchen Fan 

Closes #8379 from yhuai/aggTransformDown-branch1.4.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2671551a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2671551a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2671551a

Branch: refs/heads/branch-1.4
Commit: 2671551a94f203bcfb3d0ab11e551c2f9865f4ea
Parents: c734987
Author: Yin Huai 
Authored: Mon Aug 24 13:02:06 2015 -0700
Committer: Michael Armbrust 
Committed: Mon Aug 24 13:02:06 2015 -0700

--
 .../spark/sql/catalyst/planning/patterns.scala  | 13 ++--
 .../org/apache/spark/sql/SQLQuerySuite.scala| 22 
 2 files changed, 33 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2671551a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 1dd75a8..c1b88d2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -151,7 +151,10 @@ object PartialAggregation {
 
 // Replace aggregations with a new expression that computes the result 
from the already
 // computed partial evaluations and grouping values.
-val rewrittenAggregateExpressions = 
aggregateExpressions.map(_.transformUp {
+// transformDown is needed at here because we want to match aggregate 
function first.
+// Otherwise, if a grouping expression is used as an argument of an 
aggregate function,
+// we will match grouping expression first and have a wrong plan.
+val rewrittenAggregateExpressions = 
aggregateExpressions.map(_.transformDown {
   case e: Expression if partialEvaluations.contains(new 
TreeNodeRef(e)) =>
 partialEvaluations(new TreeNodeRef(e)).finalEvaluation
 
@@ -159,7 +162,13 @@ object PartialAggregation {
 // Should trim aliases around `GetField`s. These aliases are 
introduced while
 // resolving struct field accesses, because `GetField` is not a 
`NamedExpression`.
 // (Should we just turn `GetField` into a `NamedExpression`?)
-val trimmed = e.transform { case Alias(g: ExtractValue, _) => g }
+def trimAliases(e: Expression): Expression =
+  e.transform { case Alias(g: ExtractValue, _) => g }
+val trimmed = e match {
+  // Don't trim the top level Alias.
+  case Alias(child, name) => Alias(trimAliases(child), name)()
+  case _ => trimAliases(e)
+}
 namedGroupingExpressions
   .find { case (k, v) => k semanticEquals trimmed }
   .map(_._2.toAttribute)

http://git-wip-us.apache.org/repos/asf/spark/blob/2671551a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 8a0679e..1067b94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1335,4 +1335,26 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll {
 
 checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM 
t"), Row(1, 1, 1))
   }
+
+  test("SPARK-10169: grouping expressions used as arguments of aggregate 
functions.") {
+sqlCtx.sparkContext
+  .parallelize((1 to 1000), 50)
+  .map(i => Tuple1(i))
+  .toDF("i")
+  .registerTempTable("t")
+
+val query = sqlCtx.sql(
+  """
+|select i % 10, sum(if(i % 10 = 5, 1, 0)), count(i)
+|from t
+|where i % 10 = 5
+|group by i % 10
+  """.stripMargin)
+
+checkAnswer(
+  query,
+  Row(5, 100, 100))
+
+dropTempTable("t")
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7710] [SPARK-7998] [DOCS] Docs for DataFrameStatFunctions

2015-08-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 d003373bd -> 43dcf95e4


[SPARK-7710] [SPARK-7998] [DOCS] Docs for DataFrameStatFunctions

This PR contains examples on how to use some of the Stat Functions available 
for DataFrames under `df.stat`.

rxin

Author: Burak Yavuz 

Closes #8378 from brkyvz/update-sql-docs.

(cherry picked from commit 9ce0c7ad333f4a3c01207e5e9ed42bcafb99d894)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43dcf95e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43dcf95e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43dcf95e

Branch: refs/heads/branch-1.5
Commit: 43dcf95e42eb77c7cd545179c461bb7f9430e0e3
Parents: d003373
Author: Burak Yavuz 
Authored: Mon Aug 24 13:48:01 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 24 13:48:09 2015 -0700

--
 .../scala/org/apache/spark/sql/DataFrame.scala  |   2 +-
 .../spark/sql/DataFrameStatFunctions.scala  | 101 +++
 2 files changed, 102 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/43dcf95e/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 5bed299..ae341c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -684,7 +684,7 @@ class DataFrame private[sql](
   // make it a NamedExpression.
   case Column(u: UnresolvedAttribute) => UnresolvedAlias(u)
   case Column(expr: NamedExpression) => expr
-  // Leave an unaliased explode with an empty list of names since the 
analzyer will generate the
+  // Leave an unaliased explode with an empty list of names since the 
analyzer will generate the
   // correct defaults after the nested expression's type has been resolved.
   case Column(explode: Explode) => MultiAlias(explode, Nil)
   case Column(expr: Expression) => Alias(expr, expr.prettyString)()

http://git-wip-us.apache.org/repos/asf/spark/blob/43dcf95e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
index 2e68e35..69c9847 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
@@ -39,6 +39,13 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
* @param col2 the name of the second column
* @return the covariance of the two columns.
*
+   * {{{
+   *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", 
rand(seed=10))
+   *  .withColumn("rand2", rand(seed=27))
+   *df.stat.cov("rand1", "rand2")
+   *res1: Double = 0.065...
+   * }}}
+   *
* @since 1.4.0
*/
   def cov(col1: String, col2: String): Double = {
@@ -54,6 +61,13 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
* @param col2 the name of the column to calculate the correlation against
* @return The Pearson Correlation Coefficient as a Double.
*
+   * {{{
+   *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", 
rand(seed=10))
+   *  .withColumn("rand2", rand(seed=27))
+   *df.stat.corr("rand1", "rand2")
+   *res1: Double = 0.613...
+   * }}}
+   *
* @since 1.4.0
*/
   def corr(col1: String, col2: String, method: String): Double = {
@@ -69,6 +83,13 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
* @param col2 the name of the column to calculate the correlation against
* @return The Pearson Correlation Coefficient as a Double.
*
+   * {{{
+   *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", 
rand(seed=10))
+   *  .withColumn("rand2", rand(seed=27))
+   *df.stat.corr("rand1", "rand2", "pearson")
+   *res1: Double = 0.613...
+   * }}}
+   *
* @since 1.4.0
*/
   def corr(col1: String, col2: String): Double = {
@@ -92,6 +113,20 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
* of the DataFrame.
* @return A DataFrame containing for the contingency table.
*
+   * {{{
+   *val df = sqlContext.createDataFrame(Seq((1, 1), (1, 2), (2, 1), (2, 
1), (2, 3), (3, 2),
+   *  (3, 3))).toDF("key", "value")
+   *val ct = df.stat.crosstab("key", "value")
+   *ct.show()
+   *+-+---+---+---+
+   * 

spark git commit: [SPARK-7710] [SPARK-7998] [DOCS] Docs for DataFrameStatFunctions

2015-08-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 7478c8b66 -> 9ce0c7ad3


[SPARK-7710] [SPARK-7998] [DOCS] Docs for DataFrameStatFunctions

This PR contains examples on how to use some of the Stat Functions available 
for DataFrames under `df.stat`.

rxin

Author: Burak Yavuz 

Closes #8378 from brkyvz/update-sql-docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ce0c7ad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ce0c7ad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ce0c7ad

Branch: refs/heads/master
Commit: 9ce0c7ad333f4a3c01207e5e9ed42bcafb99d894
Parents: 7478c8b
Author: Burak Yavuz 
Authored: Mon Aug 24 13:48:01 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 24 13:48:01 2015 -0700

--
 .../scala/org/apache/spark/sql/DataFrame.scala  |   2 +-
 .../spark/sql/DataFrameStatFunctions.scala  | 101 +++
 2 files changed, 102 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9ce0c7ad/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index d6688b2..791c10c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -684,7 +684,7 @@ class DataFrame private[sql](
   // make it a NamedExpression.
   case Column(u: UnresolvedAttribute) => UnresolvedAlias(u)
   case Column(expr: NamedExpression) => expr
-  // Leave an unaliased explode with an empty list of names since the 
analzyer will generate the
+  // Leave an unaliased explode with an empty list of names since the 
analyzer will generate the
   // correct defaults after the nested expression's type has been resolved.
   case Column(explode: Explode) => MultiAlias(explode, Nil)
   case Column(expr: Expression) => Alias(expr, expr.prettyString)()

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce0c7ad/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
index 2e68e35..69c9847 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
@@ -39,6 +39,13 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
* @param col2 the name of the second column
* @return the covariance of the two columns.
*
+   * {{{
+   *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", 
rand(seed=10))
+   *  .withColumn("rand2", rand(seed=27))
+   *df.stat.cov("rand1", "rand2")
+   *res1: Double = 0.065...
+   * }}}
+   *
* @since 1.4.0
*/
   def cov(col1: String, col2: String): Double = {
@@ -54,6 +61,13 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
* @param col2 the name of the column to calculate the correlation against
* @return The Pearson Correlation Coefficient as a Double.
*
+   * {{{
+   *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", 
rand(seed=10))
+   *  .withColumn("rand2", rand(seed=27))
+   *df.stat.corr("rand1", "rand2")
+   *res1: Double = 0.613...
+   * }}}
+   *
* @since 1.4.0
*/
   def corr(col1: String, col2: String, method: String): Double = {
@@ -69,6 +83,13 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
* @param col2 the name of the column to calculate the correlation against
* @return The Pearson Correlation Coefficient as a Double.
*
+   * {{{
+   *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", 
rand(seed=10))
+   *  .withColumn("rand2", rand(seed=27))
+   *df.stat.corr("rand1", "rand2", "pearson")
+   *res1: Double = 0.613...
+   * }}}
+   *
* @since 1.4.0
*/
   def corr(col1: String, col2: String): Double = {
@@ -92,6 +113,20 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
* of the DataFrame.
* @return A DataFrame containing for the contingency table.
*
+   * {{{
+   *val df = sqlContext.createDataFrame(Seq((1, 1), (1, 2), (2, 1), (2, 
1), (2, 3), (3, 2),
+   *  (3, 3))).toDF("key", "value")
+   *val ct = df.stat.crosstab("key", "value")
+   *ct.show()
+   *+-+---+---+---+
+   *|key_value|  1|  2|  3|
+   *+-+---+---+---+
+   *|2|  2|  0|  1|
+   *|   

Git Push Summary

2015-08-24 Thread rxin
Repository: spark
Updated Tags:  refs/tags/v1.5.0-rc2 [deleted] e2569282a

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10144] [UI] Actually show peak execution memory by default

2015-08-24 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 43dcf95e4 -> 831f78ee5


[SPARK-10144] [UI] Actually show peak execution memory by default

The peak execution memory metric was introduced in SPARK-8735. That was before 
Tungsten was enabled by default, so it assumed that `spark.sql.unsafe.enabled` 
must be explicitly set to true. The result is that the memory is not displayed 
by default.

Author: Andrew Or 

Closes #8345 from andrewor14/show-memory-default.

(cherry picked from commit 662bb9667669cb07cf6d2ccee0d8e76bb561cd89)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/831f78ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/831f78ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/831f78ee

Branch: refs/heads/branch-1.5
Commit: 831f78ee5d2deed9b529214b2613c7e972453514
Parents: 43dcf95
Author: Andrew Or 
Authored: Mon Aug 24 14:10:50 2015 -0700
Committer: Yin Huai 
Committed: Mon Aug 24 14:11:03 2015 -0700

--
 core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 6 ++
 core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala | 8 ++--
 2 files changed, 8 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/831f78ee/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index fb4556b..4adc659 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -68,8 +68,7 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
   // if we find that it's okay.
   private val MAX_TIMELINE_TASKS = 
parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)
 
-  private val displayPeakExecutionMemory =
-parent.conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean)
+  private val displayPeakExecutionMemory = 
parent.conf.getBoolean("spark.sql.unsafe.enabled", true)
 
   def render(request: HttpServletRequest): Seq[Node] = {
 progressListener.synchronized {
@@ -1193,8 +1192,7 @@ private[ui] class TaskPagedTable(
 desc: Boolean) extends PagedTable[TaskTableRowData] {
 
   // We only track peak memory used for unsafe operators
-  private val displayPeakExecutionMemory =
-conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean)
+  private val displayPeakExecutionMemory = 
conf.getBoolean("spark.sql.unsafe.enabled", true)
 
   override def tableId: String = "task-table"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/831f78ee/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 98f9314..3388c6d 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -33,14 +33,18 @@ class StagePageSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("peak execution memory only displayed if unsafe is enabled") {
 val unsafeConf = "spark.sql.unsafe.enabled"
-val conf = new SparkConf().set(unsafeConf, "true")
+val conf = new SparkConf(false).set(unsafeConf, "true")
 val html = renderStagePage(conf).toString().toLowerCase
 val targetString = "peak execution memory"
 assert(html.contains(targetString))
 // Disable unsafe and make sure it's not there
-val conf2 = new SparkConf().set(unsafeConf, "false")
+val conf2 = new SparkConf(false).set(unsafeConf, "false")
 val html2 = renderStagePage(conf2).toString().toLowerCase
 assert(!html2.contains(targetString))
+// Avoid setting anything; it should be displayed by default
+val conf3 = new SparkConf(false)
+val html3 = renderStagePage(conf3).toString().toLowerCase
+assert(html3.contains(targetString))
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10144] [UI] Actually show peak execution memory by default

2015-08-24 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 9ce0c7ad3 -> 662bb9667


[SPARK-10144] [UI] Actually show peak execution memory by default

The peak execution memory metric was introduced in SPARK-8735. That was before 
Tungsten was enabled by default, so it assumed that `spark.sql.unsafe.enabled` 
must be explicitly set to true. The result is that the memory is not displayed 
by default.

Author: Andrew Or 

Closes #8345 from andrewor14/show-memory-default.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/662bb966
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/662bb966
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/662bb966

Branch: refs/heads/master
Commit: 662bb9667669cb07cf6d2ccee0d8e76bb561cd89
Parents: 9ce0c7a
Author: Andrew Or 
Authored: Mon Aug 24 14:10:50 2015 -0700
Committer: Yin Huai 
Committed: Mon Aug 24 14:10:50 2015 -0700

--
 core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 6 ++
 core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala | 8 ++--
 2 files changed, 8 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/662bb966/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index fb4556b..4adc659 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -68,8 +68,7 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
   // if we find that it's okay.
   private val MAX_TIMELINE_TASKS = 
parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)
 
-  private val displayPeakExecutionMemory =
-parent.conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean)
+  private val displayPeakExecutionMemory = 
parent.conf.getBoolean("spark.sql.unsafe.enabled", true)
 
   def render(request: HttpServletRequest): Seq[Node] = {
 progressListener.synchronized {
@@ -1193,8 +1192,7 @@ private[ui] class TaskPagedTable(
 desc: Boolean) extends PagedTable[TaskTableRowData] {
 
   // We only track peak memory used for unsafe operators
-  private val displayPeakExecutionMemory =
-conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean)
+  private val displayPeakExecutionMemory = 
conf.getBoolean("spark.sql.unsafe.enabled", true)
 
   override def tableId: String = "task-table"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/662bb966/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 98f9314..3388c6d 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -33,14 +33,18 @@ class StagePageSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("peak execution memory only displayed if unsafe is enabled") {
 val unsafeConf = "spark.sql.unsafe.enabled"
-val conf = new SparkConf().set(unsafeConf, "true")
+val conf = new SparkConf(false).set(unsafeConf, "true")
 val html = renderStagePage(conf).toString().toLowerCase
 val targetString = "peak execution memory"
 assert(html.contains(targetString))
 // Disable unsafe and make sure it's not there
-val conf2 = new SparkConf().set(unsafeConf, "false")
+val conf2 = new SparkConf(false).set(unsafeConf, "false")
 val html2 = renderStagePage(conf2).toString().toLowerCase
 assert(!html2.contains(targetString))
+// Avoid setting anything; it should be displayed by default
+val conf3 = new SparkConf(false)
+val html3 = renderStagePage(conf3).toString().toLowerCase
+assert(html3.contains(targetString))
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test cases

2015-08-24 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 662bb9667 -> a2f4cdceb


[SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test 
cases

This PR refactors `ParquetHiveCompatibilitySuite` so that it's easier to add 
new test cases.

Hit two bugs, SPARK-10177 and HIVE-11625, while working on this, added test 
cases for them and marked as ignored for now. SPARK-10177 will be addressed in 
a separate PR.

Author: Cheng Lian 

Closes #8392 from liancheng/spark-8580/parquet-hive-compat-tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2f4cdce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2f4cdce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2f4cdce

Branch: refs/heads/master
Commit: a2f4cdceba32aaa0df59df335ca0ce1ac73fc6c2
Parents: 662bb96
Author: Cheng Lian 
Authored: Mon Aug 24 14:11:19 2015 -0700
Committer: Davies Liu 
Committed: Mon Aug 24 14:11:19 2015 -0700

--
 .../hive/ParquetHiveCompatibilitySuite.scala| 132 +--
 1 file changed, 93 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a2f4cdce/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index 13452e7..bc30180 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -17,15 +17,17 @@
 
 package org.apache.spark.sql.hive
 
+import java.sql.Timestamp
+import java.util.{Locale, TimeZone}
+
 import org.apache.hadoop.hive.conf.HiveConf
+import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.sql.hive.test.TestHive
 import 
org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
+import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.{Row, SQLConf, SQLContext}
 
-class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
-  import ParquetCompatibilityTest.makeNullable
-
+class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with 
BeforeAndAfterAll {
   override def _sqlContext: SQLContext = TestHive
   private val sqlContext = _sqlContext
 
@@ -35,69 +37,121 @@ class ParquetHiveCompatibilitySuite extends 
ParquetCompatibilityTest {
*/
   private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
 
-  test("Read Parquet file generated by parquet-hive") {
+  private val originalTimeZone = TimeZone.getDefault
+  private val originalLocale = Locale.getDefault
+
+  protected override def beforeAll(): Unit = {
+TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
+Locale.setDefault(Locale.US)
+  }
+
+  override protected def afterAll(): Unit = {
+TimeZone.setDefault(originalTimeZone)
+Locale.setDefault(originalLocale)
+  }
+
+  override protected def logParquetSchema(path: String): Unit = {
+val schema = readParquetSchema(path, { path =>
+  !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir)
+})
+
+logInfo(
+  s"""Schema of the Parquet file written by parquet-avro:
+ |$schema
+   """.stripMargin)
+  }
+
+  private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit 
= {
 withTable("parquet_compat") {
   withTempPath { dir =>
 val path = dir.getCanonicalPath
 
+// Hive columns are always nullable, so here we append a all-null row.
+val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil
+
+// Don't convert Hive metastore Parquet tables to let Hive write those 
Parquet files.
 withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
   withTempTable("data") {
-sqlContext.sql(
+val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s"  
col_$index $typ" }
+
+val ddl =
   s"""CREATE TABLE parquet_compat(
- |  bool_column BOOLEAN,
- |  byte_column TINYINT,
- |  short_column SMALLINT,
- |  int_column INT,
- |  long_column BIGINT,
- |  float_column FLOAT,
- |  double_column DOUBLE,
- |
- |  strings_column ARRAY,
- |  int_to_string_column MAP
+ |${fields.mkString(",\n")}
  |)
  |STORED AS PARQUET
  |LOCATION '$path'
+   """.stripMargin
+
+logInfo(
+  s"""Cr

spark git commit: [SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test cases

2015-08-24 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 831f78ee5 -> d36f3517c


[SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test 
cases

This PR refactors `ParquetHiveCompatibilitySuite` so that it's easier to add 
new test cases.

Hit two bugs, SPARK-10177 and HIVE-11625, while working on this, added test 
cases for them and marked as ignored for now. SPARK-10177 will be addressed in 
a separate PR.

Author: Cheng Lian 

Closes #8392 from liancheng/spark-8580/parquet-hive-compat-tests.

(cherry picked from commit a2f4cdceba32aaa0df59df335ca0ce1ac73fc6c2)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d36f3517
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d36f3517
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d36f3517

Branch: refs/heads/branch-1.5
Commit: d36f3517c8ddd8f9b5f05d0634dc2d49448200d9
Parents: 831f78e
Author: Cheng Lian 
Authored: Mon Aug 24 14:11:19 2015 -0700
Committer: Davies Liu 
Committed: Mon Aug 24 14:11:30 2015 -0700

--
 .../hive/ParquetHiveCompatibilitySuite.scala| 132 +--
 1 file changed, 93 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d36f3517/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index 13452e7..bc30180 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -17,15 +17,17 @@
 
 package org.apache.spark.sql.hive
 
+import java.sql.Timestamp
+import java.util.{Locale, TimeZone}
+
 import org.apache.hadoop.hive.conf.HiveConf
+import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.sql.hive.test.TestHive
 import 
org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
+import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.{Row, SQLConf, SQLContext}
 
-class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
-  import ParquetCompatibilityTest.makeNullable
-
+class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with 
BeforeAndAfterAll {
   override def _sqlContext: SQLContext = TestHive
   private val sqlContext = _sqlContext
 
@@ -35,69 +37,121 @@ class ParquetHiveCompatibilitySuite extends 
ParquetCompatibilityTest {
*/
   private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
 
-  test("Read Parquet file generated by parquet-hive") {
+  private val originalTimeZone = TimeZone.getDefault
+  private val originalLocale = Locale.getDefault
+
+  protected override def beforeAll(): Unit = {
+TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
+Locale.setDefault(Locale.US)
+  }
+
+  override protected def afterAll(): Unit = {
+TimeZone.setDefault(originalTimeZone)
+Locale.setDefault(originalLocale)
+  }
+
+  override protected def logParquetSchema(path: String): Unit = {
+val schema = readParquetSchema(path, { path =>
+  !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir)
+})
+
+logInfo(
+  s"""Schema of the Parquet file written by parquet-avro:
+ |$schema
+   """.stripMargin)
+  }
+
+  private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit 
= {
 withTable("parquet_compat") {
   withTempPath { dir =>
 val path = dir.getCanonicalPath
 
+// Hive columns are always nullable, so here we append a all-null row.
+val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil
+
+// Don't convert Hive metastore Parquet tables to let Hive write those 
Parquet files.
 withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
   withTempTable("data") {
-sqlContext.sql(
+val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s"  
col_$index $typ" }
+
+val ddl =
   s"""CREATE TABLE parquet_compat(
- |  bool_column BOOLEAN,
- |  byte_column TINYINT,
- |  short_column SMALLINT,
- |  int_column INT,
- |  long_column BIGINT,
- |  float_column FLOAT,
- |  double_column DOUBLE,
- |
- |  strings_column ARRAY,
- |  int_to_string_column MAP
+ |${fields.mkString(",\n")}
  |)
  |STORED AS PARQUET

spark git commit: [SPARK-9758] [TEST] [SQL] Compilation issue for hive test / wrong package?

2015-08-24 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 d36f3517c -> 92234439d


[SPARK-9758] [TEST] [SQL] Compilation issue for hive test / wrong package?

Move `test.org.apache.spark.sql.hive` package tests to apparent intended 
`org.apache.spark.sql.hive` as they don't intend to test behavior from outside 
org.apache.spark.*

Alternate take, per discussion at https://github.com/apache/spark/pull/8051
I think this is what vanzin and I had in mind but also CC rxin to cross-check, 
as this does indeed depend on whether these tests were accidentally in this 
package or not. Testing from a `test.org.apache.spark` package is legitimate 
but didn't seem to be the intent here.

Author: Sean Owen 

Closes #8307 from srowen/SPARK-9758.

(cherry picked from commit cb2d2e15844d7ae34b5dd7028b55e11586ed93fa)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92234439
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92234439
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92234439

Branch: refs/heads/branch-1.5
Commit: 92234439d86044a3ec9f198c3b13ec20c763393d
Parents: d36f351
Author: Sean Owen 
Authored: Mon Aug 24 22:35:21 2015 +0100
Committer: Sean Owen 
Committed: Mon Aug 24 22:35:31 2015 +0100

--
 .../spark/sql/hive/JavaDataFrameSuite.java  | 104 
 .../sql/hive/JavaMetastoreDataSourcesSuite.java | 162 ++
 .../spark/sql/hive/aggregate/MyDoubleAvg.java   | 129 +++
 .../spark/sql/hive/aggregate/MyDoubleSum.java   | 118 ++
 .../sql/hive/execution/UDFIntegerToString.java  |  26 +++
 .../sql/hive/execution/UDFListListInt.java  |  47 ++
 .../spark/sql/hive/execution/UDFListString.java |  38 +
 .../sql/hive/execution/UDFStringString.java |  26 +++
 .../sql/hive/execution/UDFTwoListList.java  |  28 
 .../spark/sql/hive/JavaDataFrameSuite.java  | 106 
 .../sql/hive/JavaMetastoreDataSourcesSuite.java | 163 ---
 .../spark/sql/hive/aggregate/MyDoubleAvg.java   | 129 ---
 .../spark/sql/hive/aggregate/MyDoubleSum.java   | 118 --
 .../sql/hive/execution/UDFIntegerToString.java  |  26 ---
 .../sql/hive/execution/UDFListListInt.java  |  47 --
 .../spark/sql/hive/execution/UDFListString.java |  38 -
 .../sql/hive/execution/UDFStringString.java |  26 ---
 .../sql/hive/execution/UDFTwoListList.java  |  28 
 .../hive/execution/AggregationQuerySuite.scala  |   2 +-
 19 files changed, 679 insertions(+), 682 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/92234439/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
--
diff --git 
a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java 
b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
new file mode 100644
index 000..019d8a3
--- /dev/null
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.expressions.Window;
+import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
+import static org.apache.spark.sql.functions.*;
+import org.apache.spark.sql.hive.test.TestHive$;
+import org.apache.spark.sql.hive.aggregate.MyDoubleSum;
+
+public class JavaDataFrameSuite {
+  private transient JavaSparkContext sc;
+  private transient HiveContext hc;
+
+  DataFrame df;
+
+  private void checkAnswer(DataFrame actual, List expected) {
+String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected);
+if (errorMessage != null) {
+  Assert.fai

spark git commit: [SPARK-9758] [TEST] [SQL] Compilation issue for hive test / wrong package?

2015-08-24 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master a2f4cdceb -> cb2d2e158


[SPARK-9758] [TEST] [SQL] Compilation issue for hive test / wrong package?

Move `test.org.apache.spark.sql.hive` package tests to apparent intended 
`org.apache.spark.sql.hive` as they don't intend to test behavior from outside 
org.apache.spark.*

Alternate take, per discussion at https://github.com/apache/spark/pull/8051
I think this is what vanzin and I had in mind but also CC rxin to cross-check, 
as this does indeed depend on whether these tests were accidentally in this 
package or not. Testing from a `test.org.apache.spark` package is legitimate 
but didn't seem to be the intent here.

Author: Sean Owen 

Closes #8307 from srowen/SPARK-9758.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb2d2e15
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb2d2e15
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb2d2e15

Branch: refs/heads/master
Commit: cb2d2e15844d7ae34b5dd7028b55e11586ed93fa
Parents: a2f4cdc
Author: Sean Owen 
Authored: Mon Aug 24 22:35:21 2015 +0100
Committer: Sean Owen 
Committed: Mon Aug 24 22:35:21 2015 +0100

--
 .../spark/sql/hive/JavaDataFrameSuite.java  | 104 
 .../sql/hive/JavaMetastoreDataSourcesSuite.java | 162 ++
 .../spark/sql/hive/aggregate/MyDoubleAvg.java   | 129 +++
 .../spark/sql/hive/aggregate/MyDoubleSum.java   | 118 ++
 .../sql/hive/execution/UDFIntegerToString.java  |  26 +++
 .../sql/hive/execution/UDFListListInt.java  |  47 ++
 .../spark/sql/hive/execution/UDFListString.java |  38 +
 .../sql/hive/execution/UDFStringString.java |  26 +++
 .../sql/hive/execution/UDFTwoListList.java  |  28 
 .../spark/sql/hive/JavaDataFrameSuite.java  | 106 
 .../sql/hive/JavaMetastoreDataSourcesSuite.java | 163 ---
 .../spark/sql/hive/aggregate/MyDoubleAvg.java   | 129 ---
 .../spark/sql/hive/aggregate/MyDoubleSum.java   | 118 --
 .../sql/hive/execution/UDFIntegerToString.java  |  26 ---
 .../sql/hive/execution/UDFListListInt.java  |  47 --
 .../spark/sql/hive/execution/UDFListString.java |  38 -
 .../sql/hive/execution/UDFStringString.java |  26 ---
 .../sql/hive/execution/UDFTwoListList.java  |  28 
 .../hive/execution/AggregationQuerySuite.scala  |   2 +-
 19 files changed, 679 insertions(+), 682 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
--
diff --git 
a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java 
b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
new file mode 100644
index 000..019d8a3
--- /dev/null
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.expressions.Window;
+import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
+import static org.apache.spark.sql.functions.*;
+import org.apache.spark.sql.hive.test.TestHive$;
+import org.apache.spark.sql.hive.aggregate.MyDoubleSum;
+
+public class JavaDataFrameSuite {
+  private transient JavaSparkContext sc;
+  private transient HiveContext hc;
+
+  DataFrame df;
+
+  private void checkAnswer(DataFrame actual, List expected) {
+String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected);
+if (errorMessage != null) {
+  Assert.fail(errorMessage);
+}
+  }
+
+  @Before
+  public void setUp() throws IOException {
+hc = TestHive

spark git commit: [SPARK-10061] [DOC] ML ensemble docs

2015-08-24 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 92234439d -> aadb9de4c


[SPARK-10061] [DOC] ML ensemble docs

User guide for spark.ml GBTs and Random Forests.
The examples are copied from the decision tree guide and modified to run.

I caught some issues I had somehow missed in the tree guide as well.

I have run all examples, including Java ones.  (Of course, I thought I had 
previously as well...)

CC: mengxr manishamde yanboliang

Author: Joseph K. Bradley 

Closes #8369 from jkbradley/ml-ensemble-docs.

(cherry picked from commit 13db11cb08eb90eb0ea3402c9fe0270aa282f247)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aadb9de4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aadb9de4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aadb9de4

Branch: refs/heads/branch-1.5
Commit: aadb9de4ce81db420cac0400b6de5bcc82c4ebe4
Parents: 9223443
Author: Joseph K. Bradley 
Authored: Mon Aug 24 15:38:54 2015 -0700
Committer: Xiangrui Meng 
Committed: Mon Aug 24 15:39:01 2015 -0700

--
 docs/ml-decision-tree.md |  75 ++--
 docs/ml-ensembles.md | 952 +-
 2 files changed, 976 insertions(+), 51 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aadb9de4/docs/ml-decision-tree.md
--
diff --git a/docs/ml-decision-tree.md b/docs/ml-decision-tree.md
index 958c6f5..542819e 100644
--- a/docs/ml-decision-tree.md
+++ b/docs/ml-decision-tree.md
@@ -30,7 +30,7 @@ The Pipelines API for Decision Trees offers a bit more 
functionality than the or
 
 Ensembles of trees (Random Forests and Gradient-Boosted Trees) are described 
in the [Ensembles guide](ml-ensembles.html).
 
-# Inputs and Outputs (Predictions)
+# Inputs and Outputs
 
 We list the input and output (prediction) column types here.
 All output columns are optional; to exclude an output column, set its 
corresponding Param to an empty string.
@@ -234,7 +234,7 @@ IndexToString labelConverter = new IndexToString()
 
 // Chain indexers and tree in a Pipeline
 Pipeline pipeline = new Pipeline()
-  .setStages(new PipelineStage[]{labelIndexer, featureIndexer, dt, 
labelConverter});
+  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, dt, 
labelConverter});
 
 // Train model.  This also runs the indexers.
 PipelineModel model = pipeline.fit(trainingData);
@@ -315,10 +315,13 @@ print treeModel # summary only
 
 ## Regression
 
+The following examples load a dataset in LibSVM format, split it into training 
and test sets, train on the first dataset, and then evaluate on the held-out 
test set.
+We use a feature transformer to index categorical features, adding metadata to 
the `DataFrame` which the Decision Tree algorithm can recognize.
+
 
 
 
-More details on parameters can be found in the [Scala API 
documentation](api/scala/index.html#org.apache.spark.ml.classification.DecisionTreeClassifier).
+More details on parameters can be found in the [Scala API 
documentation](api/scala/index.html#org.apache.spark.ml.regression.DecisionTreeRegressor).
 
 {% highlight scala %}
 import org.apache.spark.ml.Pipeline
@@ -347,7 +350,7 @@ val dt = new DecisionTreeRegressor()
   .setLabelCol("label")
   .setFeaturesCol("indexedFeatures")
 
-// Chain indexers and tree in a Pipeline
+// Chain indexer and tree in a Pipeline
 val pipeline = new Pipeline()
   .setStages(Array(featureIndexer, dt))
 
@@ -365,9 +368,7 @@ val evaluator = new RegressionEvaluator()
   .setLabelCol("label")
   .setPredictionCol("prediction")
   .setMetricName("rmse")
-// We negate the RMSE value since RegressionEvalutor returns negated RMSE
-// (since evaluation metrics are meant to be maximized by CrossValidator).
-val rmse = - evaluator.evaluate(predictions)
+val rmse = evaluator.evaluate(predictions)
 println("Root Mean Squared Error (RMSE) on test data = " + rmse)
 
 val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
@@ -377,14 +378,15 @@ println("Learned regression tree model:\n" + 
treeModel.toDebugString)
 
 
 
-More details on parameters can be found in the [Java API 
documentation](api/java/org/apache/spark/ml/classification/DecisionTreeClassifier.html).
+More details on parameters can be found in the [Java API 
documentation](api/java/org/apache/spark/ml/regression/DecisionTreeRegressor.html).
 
 {% highlight java %}
 import org.apache.spark.ml.Pipeline;
 import org.apache.spark.ml.PipelineModel;
 import org.apache.spark.ml.PipelineStage;
 import org.apache.spark.ml.evaluation.RegressionEvaluator;
-import org.apache.spark.ml.feature.*;
+import org.apache.spark.ml.feature.VectorIndexer;
+import org.apache.spark.ml.feature.VectorIndexerModel;
 import org.apache.spark.ml.regression.DecisionTreeR

spark git commit: [SPARK-10061] [DOC] ML ensemble docs

2015-08-24 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master cb2d2e158 -> 13db11cb0


[SPARK-10061] [DOC] ML ensemble docs

User guide for spark.ml GBTs and Random Forests.
The examples are copied from the decision tree guide and modified to run.

I caught some issues I had somehow missed in the tree guide as well.

I have run all examples, including Java ones.  (Of course, I thought I had 
previously as well...)

CC: mengxr manishamde yanboliang

Author: Joseph K. Bradley 

Closes #8369 from jkbradley/ml-ensemble-docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13db11cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13db11cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13db11cb

Branch: refs/heads/master
Commit: 13db11cb08eb90eb0ea3402c9fe0270aa282f247
Parents: cb2d2e1
Author: Joseph K. Bradley 
Authored: Mon Aug 24 15:38:54 2015 -0700
Committer: Xiangrui Meng 
Committed: Mon Aug 24 15:38:54 2015 -0700

--
 docs/ml-decision-tree.md |  75 ++--
 docs/ml-ensembles.md | 952 +-
 2 files changed, 976 insertions(+), 51 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/13db11cb/docs/ml-decision-tree.md
--
diff --git a/docs/ml-decision-tree.md b/docs/ml-decision-tree.md
index 958c6f5..542819e 100644
--- a/docs/ml-decision-tree.md
+++ b/docs/ml-decision-tree.md
@@ -30,7 +30,7 @@ The Pipelines API for Decision Trees offers a bit more 
functionality than the or
 
 Ensembles of trees (Random Forests and Gradient-Boosted Trees) are described 
in the [Ensembles guide](ml-ensembles.html).
 
-# Inputs and Outputs (Predictions)
+# Inputs and Outputs
 
 We list the input and output (prediction) column types here.
 All output columns are optional; to exclude an output column, set its 
corresponding Param to an empty string.
@@ -234,7 +234,7 @@ IndexToString labelConverter = new IndexToString()
 
 // Chain indexers and tree in a Pipeline
 Pipeline pipeline = new Pipeline()
-  .setStages(new PipelineStage[]{labelIndexer, featureIndexer, dt, 
labelConverter});
+  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, dt, 
labelConverter});
 
 // Train model.  This also runs the indexers.
 PipelineModel model = pipeline.fit(trainingData);
@@ -315,10 +315,13 @@ print treeModel # summary only
 
 ## Regression
 
+The following examples load a dataset in LibSVM format, split it into training 
and test sets, train on the first dataset, and then evaluate on the held-out 
test set.
+We use a feature transformer to index categorical features, adding metadata to 
the `DataFrame` which the Decision Tree algorithm can recognize.
+
 
 
 
-More details on parameters can be found in the [Scala API 
documentation](api/scala/index.html#org.apache.spark.ml.classification.DecisionTreeClassifier).
+More details on parameters can be found in the [Scala API 
documentation](api/scala/index.html#org.apache.spark.ml.regression.DecisionTreeRegressor).
 
 {% highlight scala %}
 import org.apache.spark.ml.Pipeline
@@ -347,7 +350,7 @@ val dt = new DecisionTreeRegressor()
   .setLabelCol("label")
   .setFeaturesCol("indexedFeatures")
 
-// Chain indexers and tree in a Pipeline
+// Chain indexer and tree in a Pipeline
 val pipeline = new Pipeline()
   .setStages(Array(featureIndexer, dt))
 
@@ -365,9 +368,7 @@ val evaluator = new RegressionEvaluator()
   .setLabelCol("label")
   .setPredictionCol("prediction")
   .setMetricName("rmse")
-// We negate the RMSE value since RegressionEvalutor returns negated RMSE
-// (since evaluation metrics are meant to be maximized by CrossValidator).
-val rmse = - evaluator.evaluate(predictions)
+val rmse = evaluator.evaluate(predictions)
 println("Root Mean Squared Error (RMSE) on test data = " + rmse)
 
 val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
@@ -377,14 +378,15 @@ println("Learned regression tree model:\n" + 
treeModel.toDebugString)
 
 
 
-More details on parameters can be found in the [Java API 
documentation](api/java/org/apache/spark/ml/classification/DecisionTreeClassifier.html).
+More details on parameters can be found in the [Java API 
documentation](api/java/org/apache/spark/ml/regression/DecisionTreeRegressor.html).
 
 {% highlight java %}
 import org.apache.spark.ml.Pipeline;
 import org.apache.spark.ml.PipelineModel;
 import org.apache.spark.ml.PipelineStage;
 import org.apache.spark.ml.evaluation.RegressionEvaluator;
-import org.apache.spark.ml.feature.*;
+import org.apache.spark.ml.feature.VectorIndexer;
+import org.apache.spark.ml.feature.VectorIndexerModel;
 import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
 import org.apache.spark.ml.regression.DecisionTreeRegressor;
 import org.apache.spark.mllib

spark git commit: [SPARK-10190] Fix NPE in CatalystTypeConverters Decimal toScala converter

2015-08-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 aadb9de4c -> a4bad5f25


[SPARK-10190] Fix NPE in CatalystTypeConverters Decimal toScala converter

This adds a missing null check to the Decimal `toScala` converter in 
`CatalystTypeConverters`, fixing an NPE.

Author: Josh Rosen 

Closes #8401 from JoshRosen/SPARK-10190.

(cherry picked from commit d7b4c095271c36fcc7f9ded267ecf5ec66fac803)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4bad5f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4bad5f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4bad5f2

Branch: refs/heads/branch-1.5
Commit: a4bad5f25ed41821e36ecab23ec686fcb6071deb
Parents: aadb9de
Author: Josh Rosen 
Authored: Mon Aug 24 16:17:45 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 24 16:17:52 2015 -0700

--
 .../org/apache/spark/sql/catalyst/CatalystTypeConverters.scala  | 5 -
 .../apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala | 4 +++-
 2 files changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a4bad5f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 8d0c64e..966623e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -329,7 +329,10 @@ object CatalystTypeConverters {
 null
   }
 }
-override def toScala(catalystValue: Decimal): JavaBigDecimal = 
catalystValue.toJavaBigDecimal
+override def toScala(catalystValue: Decimal): JavaBigDecimal = {
+  if (catalystValue == null) null
+  else catalystValue.toJavaBigDecimal
+}
 override def toScalaImpl(row: InternalRow, column: Int): JavaBigDecimal =
   row.getDecimal(column, dataType.precision, 
dataType.scale).toJavaBigDecimal
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a4bad5f2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
index df0f045..03bb102 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
@@ -32,7 +32,9 @@ class CatalystTypeConvertersSuite extends SparkFunSuite {
 IntegerType,
 LongType,
 FloatType,
-DoubleType)
+DoubleType,
+DecimalType.SYSTEM_DEFAULT,
+DecimalType.USER_DEFAULT)
 
   test("null handling in rows") {
 val schema = StructType(simpleTypes.map(t => 
StructField(t.getClass.getName, t)))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10190] Fix NPE in CatalystTypeConverters Decimal toScala converter

2015-08-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 13db11cb0 -> d7b4c0952


[SPARK-10190] Fix NPE in CatalystTypeConverters Decimal toScala converter

This adds a missing null check to the Decimal `toScala` converter in 
`CatalystTypeConverters`, fixing an NPE.

Author: Josh Rosen 

Closes #8401 from JoshRosen/SPARK-10190.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7b4c095
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7b4c095
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7b4c095

Branch: refs/heads/master
Commit: d7b4c095271c36fcc7f9ded267ecf5ec66fac803
Parents: 13db11c
Author: Josh Rosen 
Authored: Mon Aug 24 16:17:45 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 24 16:17:45 2015 -0700

--
 .../org/apache/spark/sql/catalyst/CatalystTypeConverters.scala  | 5 -
 .../apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala | 4 +++-
 2 files changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d7b4c095/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 8d0c64e..966623e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -329,7 +329,10 @@ object CatalystTypeConverters {
 null
   }
 }
-override def toScala(catalystValue: Decimal): JavaBigDecimal = 
catalystValue.toJavaBigDecimal
+override def toScala(catalystValue: Decimal): JavaBigDecimal = {
+  if (catalystValue == null) null
+  else catalystValue.toJavaBigDecimal
+}
 override def toScalaImpl(row: InternalRow, column: Int): JavaBigDecimal =
   row.getDecimal(column, dataType.precision, 
dataType.scale).toJavaBigDecimal
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7b4c095/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
index df0f045..03bb102 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
@@ -32,7 +32,9 @@ class CatalystTypeConvertersSuite extends SparkFunSuite {
 IntegerType,
 LongType,
 FloatType,
-DoubleType)
+DoubleType,
+DecimalType.SYSTEM_DEFAULT,
+DecimalType.USER_DEFAULT)
 
   test("null handling in rows") {
 val schema = StructType(simpleTypes.map(t => 
StructField(t.getClass.getName, t)))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: HOTFIX: Adding missing 1.4.1 ec2 version

2015-08-24 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 a4bad5f25 -> 8ca8bdd01


HOTFIX: Adding missing 1.4.1 ec2 version


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ca8bdd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ca8bdd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ca8bdd0

Branch: refs/heads/branch-1.5
Commit: 8ca8bdd015c53ff0c4705886545fc30eef8b8359
Parents: a4bad5f
Author: Patrick Wendell 
Authored: Mon Aug 24 17:22:09 2015 -0700
Committer: Patrick Wendell 
Committed: Mon Aug 24 17:22:09 2015 -0700

--
 ec2/spark_ec2.py | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8ca8bdd0/ec2/spark_ec2.py
--
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index ccc897f..3a2361c 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -71,6 +71,7 @@ VALID_SPARK_VERSIONS = set([
 "1.3.0",
 "1.3.1",
 "1.4.0",
+"1.4.1",
 "1.5.0"
 ])
 
@@ -85,6 +86,7 @@ SPARK_TACHYON_MAP = {
 "1.3.0": "0.5.0",
 "1.3.1": "0.5.0",
 "1.4.0": "0.6.4",
+"1.4.1": "0.6.4",
 "1.5.0": "0.7.1"
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10165] [SQL] Await child resolution in ResolveFunctions

2015-08-24 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master d7b4c0952 -> 2bf338c62


[SPARK-10165] [SQL] Await child resolution in ResolveFunctions

Currently, we eagerly attempt to resolve functions, even before their children 
are resolved.  However, this is not valid in cases where we need to know the 
types of the input arguments (i.e. when resolving Hive UDFs).

As a fix, this PR delays function resolution until the functions children are 
resolved.  This change also necessitates a change to the way we resolve 
aggregate expressions that are not in aggregate operators (e.g., in `HAVING` or 
`ORDER BY` clauses).  Specifically, we can't assume that these misplaced 
functions will be resolved, allowing us to differentiate aggregate functions 
from normal functions.  To compensate for this change we now attempt to resolve 
these unresolved expressions in the context of the aggregate operator, before 
checking to see if any aggregate expressions are present.

Author: Michael Armbrust 

Closes #8371 from marmbrus/hiveUDFResolution.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bf338c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bf338c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bf338c6

Branch: refs/heads/master
Commit: 2bf338c626e9d97ccc033cfadae8b36a82c66fd1
Parents: d7b4c09
Author: Michael Armbrust 
Authored: Mon Aug 24 18:10:51 2015 -0700
Committer: Michael Armbrust 
Committed: Mon Aug 24 18:10:51 2015 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 116 ---
 .../spark/sql/hive/execution/HiveUDFSuite.scala |   5 +
 2 files changed, 77 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2bf338c6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d0eb9c2..1a5de15 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -78,7 +78,7 @@ class Analyzer(
   ResolveAliases ::
   ExtractWindowExpressions ::
   GlobalAggregates ::
-  UnresolvedHavingClauseAttributes ::
+  ResolveAggregateFunctions ::
   HiveTypeCoercion.typeCoercionRules ++
   extendedResolutionRules : _*),
 Batch("Nondeterministic", Once,
@@ -452,37 +452,6 @@ class Analyzer(
   logDebug(s"Failed to find $missing in ${p.output.mkString(", ")}")
   s // Nothing we can do here. Return original plan.
 }
-  case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child))
-  if !s.resolved && a.resolved =>
-// A small hack to create an object that will allow us to resolve any 
references that
-// refer to named expressions that are present in the grouping 
expressions.
-val groupingRelation = LocalRelation(
-  grouping.collect { case ne: NamedExpression => ne.toAttribute }
-)
-
-// Find sort attributes that are projected away so we can temporarily 
add them back in.
-val (newOrdering, missingAttr) = resolveAndFindMissing(ordering, a, 
groupingRelation)
-
-// Find aggregate expressions and evaluate them early, since they 
can't be evaluated in a
-// Sort.
-val (withAggsRemoved, aliasedAggregateList) = newOrdering.map {
-  case aggOrdering if aggOrdering.collect { case a: 
AggregateExpression => a }.nonEmpty =>
-val aliased = Alias(aggOrdering.child, "_aggOrdering")()
-(aggOrdering.copy(child = aliased.toAttribute), Some(aliased))
-
-  case other => (other, None)
-}.unzip
-
-val missing = missingAttr ++ aliasedAggregateList.flatten
-
-if (missing.nonEmpty) {
-  // Add missing grouping exprs and then project them away after the 
sort.
-  Project(a.output,
-Sort(withAggsRemoved, global,
-  Aggregate(grouping, aggs ++ missing, child)))
-} else {
-  s // Nothing we can do here. Return original plan.
-}
 }
 
 /**
@@ -515,6 +484,7 @@ class Analyzer(
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
   case q: LogicalPlan =>
 q transformExpressions {
+  case u if !u.childrenResolved => u // Skip until children are 
resolved.
   case u @ UnresolvedFunction(name, children, isDistinct) =>
 withPosition(u) {
   registry.lookupFunction(name, children) match {
@@ -559,21 +529,79 @@ class Analyzer(
   }
 

spark git commit: [SPARK-10165] [SQL] Await child resolution in ResolveFunctions

2015-08-24 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 8ca8bdd01 -> 228e429eb


[SPARK-10165] [SQL] Await child resolution in ResolveFunctions

Currently, we eagerly attempt to resolve functions, even before their children 
are resolved.  However, this is not valid in cases where we need to know the 
types of the input arguments (i.e. when resolving Hive UDFs).

As a fix, this PR delays function resolution until the functions children are 
resolved.  This change also necessitates a change to the way we resolve 
aggregate expressions that are not in aggregate operators (e.g., in `HAVING` or 
`ORDER BY` clauses).  Specifically, we can't assume that these misplaced 
functions will be resolved, allowing us to differentiate aggregate functions 
from normal functions.  To compensate for this change we now attempt to resolve 
these unresolved expressions in the context of the aggregate operator, before 
checking to see if any aggregate expressions are present.

Author: Michael Armbrust 

Closes #8371 from marmbrus/hiveUDFResolution.

(cherry picked from commit 2bf338c626e9d97ccc033cfadae8b36a82c66fd1)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/228e429e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/228e429e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/228e429e

Branch: refs/heads/branch-1.5
Commit: 228e429ebf1f367de9087f74cf3ff43bbd32f382
Parents: 8ca8bdd
Author: Michael Armbrust 
Authored: Mon Aug 24 18:10:51 2015 -0700
Committer: Michael Armbrust 
Committed: Mon Aug 24 18:11:04 2015 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 116 ---
 .../spark/sql/hive/execution/HiveUDFSuite.scala |   5 +
 2 files changed, 77 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/228e429e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d0eb9c2..1a5de15 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -78,7 +78,7 @@ class Analyzer(
   ResolveAliases ::
   ExtractWindowExpressions ::
   GlobalAggregates ::
-  UnresolvedHavingClauseAttributes ::
+  ResolveAggregateFunctions ::
   HiveTypeCoercion.typeCoercionRules ++
   extendedResolutionRules : _*),
 Batch("Nondeterministic", Once,
@@ -452,37 +452,6 @@ class Analyzer(
   logDebug(s"Failed to find $missing in ${p.output.mkString(", ")}")
   s // Nothing we can do here. Return original plan.
 }
-  case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child))
-  if !s.resolved && a.resolved =>
-// A small hack to create an object that will allow us to resolve any 
references that
-// refer to named expressions that are present in the grouping 
expressions.
-val groupingRelation = LocalRelation(
-  grouping.collect { case ne: NamedExpression => ne.toAttribute }
-)
-
-// Find sort attributes that are projected away so we can temporarily 
add them back in.
-val (newOrdering, missingAttr) = resolveAndFindMissing(ordering, a, 
groupingRelation)
-
-// Find aggregate expressions and evaluate them early, since they 
can't be evaluated in a
-// Sort.
-val (withAggsRemoved, aliasedAggregateList) = newOrdering.map {
-  case aggOrdering if aggOrdering.collect { case a: 
AggregateExpression => a }.nonEmpty =>
-val aliased = Alias(aggOrdering.child, "_aggOrdering")()
-(aggOrdering.copy(child = aliased.toAttribute), Some(aliased))
-
-  case other => (other, None)
-}.unzip
-
-val missing = missingAttr ++ aliasedAggregateList.flatten
-
-if (missing.nonEmpty) {
-  // Add missing grouping exprs and then project them away after the 
sort.
-  Project(a.output,
-Sort(withAggsRemoved, global,
-  Aggregate(grouping, aggs ++ missing, child)))
-} else {
-  s // Nothing we can do here. Return original plan.
-}
 }
 
 /**
@@ -515,6 +484,7 @@ class Analyzer(
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
   case q: LogicalPlan =>
 q transformExpressions {
+  case u if !u.childrenResolved => u // Skip until children are 
resolved.
   case u @ UnresolvedFunction(name, children, isDistinct) =>
 withPosition(u)

spark git commit: [SPARK-10118] [SPARKR] [DOCS] Improve SparkR API docs for 1.5 release

2015-08-24 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 2bf338c62 -> 6511bf559


[SPARK-10118] [SPARKR] [DOCS] Improve SparkR API docs for 1.5 release

cc: shivaram

## Summary

- Modify `tdname` of expression functions. i.e. `ascii`: `rdname functions` => 
`rdname ascii`
- Replace the dynamical function definitions to the static ones because of thir 
documentations.

## Generated PDF File
https://drive.google.com/file/d/0B9biIZIU47lLX2t6ZjRoRnBTSEU/view?usp=sharing

## JIRA
[[SPARK-10118] Improve SparkR API docs for 1.5 release - ASF 
JIRA](https://issues.apache.org/jira/browse/SPARK-10118)

Author: Yu ISHIKAWA 
Author: Yuu ISHIKAWA 

Closes #8386 from yu-iskw/SPARK-10118.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6511bf55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6511bf55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6511bf55

Branch: refs/heads/master
Commit: 6511bf559b736d8e23ae398901c8d78938e66869
Parents: 2bf338c
Author: Yu ISHIKAWA 
Authored: Mon Aug 24 18:17:51 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Mon Aug 24 18:17:51 2015 -0700

--
 R/create-docs.sh|2 +-
 R/pkg/R/column.R|5 +-
 R/pkg/R/functions.R | 1603 ++
 R/pkg/R/generics.R  |  214 +++
 4 files changed, 1596 insertions(+), 228 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6511bf55/R/create-docs.sh
--
diff --git a/R/create-docs.sh b/R/create-docs.sh
index 6a4687b..d2ae160 100755
--- a/R/create-docs.sh
+++ b/R/create-docs.sh
@@ -39,7 +39,7 @@ pushd $FWDIR
 mkdir -p pkg/html
 pushd pkg/html
 
-Rscript -e 'library(SparkR, lib.loc="../../lib"); library(knitr); 
knit_rd("SparkR")'
+Rscript -e 'libDir <- "../../lib"; library(SparkR, lib.loc=libDir); 
library(knitr); knit_rd("SparkR", links = tools::findHTMLlinks(paste(libDir, 
"SparkR", sep="/")))'
 
 popd
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6511bf55/R/pkg/R/column.R
--
diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R
index 5a07ebd..a1f50c3 100644
--- a/R/pkg/R/column.R
+++ b/R/pkg/R/column.R
@@ -169,8 +169,7 @@ setMethod("between", signature(x = "Column"),
 #'
 #' @rdname column
 #'
-#' @examples
-#' \dontrun{
+#' @examples \dontrun{
 #'   cast(df$age, "string")
 #'   cast(df$name, list(type="array", elementType="byte", containsNull = TRUE))
 #' }
@@ -192,7 +191,7 @@ setMethod("cast",
 #'
 #' @rdname column
 #' @return a matched values as a result of comparing with given values.
-#' \dontrun{
+#' @examples \dontrun{
 #'   filter(df, "age in (10, 30)")
 #'   where(df, df$age %in% c(10, 30))
 #' }

http://git-wip-us.apache.org/repos/asf/spark/blob/6511bf55/R/pkg/R/functions.R
--
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index b5879bd..d848730 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -18,69 +18,1298 @@
 #' @include generics.R column.R
 NULL
 
-#' @title S4 expression functions for DataFrame column(s)
-#' @description These are expression functions on DataFrame columns
-
-functions1 <- c(
-  "abs", "acos", "approxCountDistinct", "ascii", "asin", "atan",
-  "avg", "base64", "bin", "bitwiseNOT", "cbrt", "ceil", "cos", "cosh", "count",
-  "crc32", "dayofmonth", "dayofyear", "exp", "explode", "expm1", "factorial",
-  "first", "floor", "hex", "hour", "initcap", "isNaN", "last", "last_day",
-  "length", "log", "log10", "log1p", "log2", "lower", "ltrim", "max", "md5",
-  "mean", "min", "minute", "month", "negate", "quarter", "reverse",
-  "rint", "round", "rtrim", "second", "sha1", "signum", "sin", "sinh", "size",
-  "soundex", "sqrt", "sum", "sumDistinct", "tan", "tanh", "toDegrees",
-  "toRadians", "to_date", "trim", "unbase64", "unhex", "upper", "weekofyear",
-  "year")
-functions2 <- c(
-  "atan2", "datediff", "hypot", "levenshtein", "months_between", "nanvl", 
"pmod")
-
-createFunction1 <- function(name) {
-  setMethod(name,
-signature(x = "Column"),
-function(x) {
-  jc <- callJStatic("org.apache.spark.sql.functions", name, x@jc)
-  column(jc)
-})
-}
-
-createFunction2 <- function(name) {
-  setMethod(name,
-signature(y = "Column"),
-function(y, x) {
-  if (class(x) == "Column") {
-x <- x@jc
-  }
-  jc <- callJStatic("org.apache.spark.sql.functions", name, y@jc, 
x)
-  column(jc)
-})
-}
+#' Creates a \code{Column} of literal value.
+#'
+#' The passed in object is returned directly if it is already a 
\linkS4class{Column}.
+#' If the object is a Scala Symbol, it 

spark git commit: [SPARK-10118] [SPARKR] [DOCS] Improve SparkR API docs for 1.5 release

2015-08-24 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 228e429eb -> ec5d09c0f


[SPARK-10118] [SPARKR] [DOCS] Improve SparkR API docs for 1.5 release

cc: shivaram

## Summary

- Modify `tdname` of expression functions. i.e. `ascii`: `rdname functions` => 
`rdname ascii`
- Replace the dynamical function definitions to the static ones because of thir 
documentations.

## Generated PDF File
https://drive.google.com/file/d/0B9biIZIU47lLX2t6ZjRoRnBTSEU/view?usp=sharing

## JIRA
[[SPARK-10118] Improve SparkR API docs for 1.5 release - ASF 
JIRA](https://issues.apache.org/jira/browse/SPARK-10118)

Author: Yu ISHIKAWA 
Author: Yuu ISHIKAWA 

Closes #8386 from yu-iskw/SPARK-10118.

(cherry picked from commit 6511bf559b736d8e23ae398901c8d78938e66869)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec5d09c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec5d09c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec5d09c0

Branch: refs/heads/branch-1.5
Commit: ec5d09c0f0f1f61d6d80a35adaba3a8102184740
Parents: 228e429
Author: Yu ISHIKAWA 
Authored: Mon Aug 24 18:17:51 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Mon Aug 24 18:17:58 2015 -0700

--
 R/create-docs.sh|2 +-
 R/pkg/R/column.R|5 +-
 R/pkg/R/functions.R | 1603 ++
 R/pkg/R/generics.R  |  214 +++
 4 files changed, 1596 insertions(+), 228 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ec5d09c0/R/create-docs.sh
--
diff --git a/R/create-docs.sh b/R/create-docs.sh
index 6a4687b..d2ae160 100755
--- a/R/create-docs.sh
+++ b/R/create-docs.sh
@@ -39,7 +39,7 @@ pushd $FWDIR
 mkdir -p pkg/html
 pushd pkg/html
 
-Rscript -e 'library(SparkR, lib.loc="../../lib"); library(knitr); 
knit_rd("SparkR")'
+Rscript -e 'libDir <- "../../lib"; library(SparkR, lib.loc=libDir); 
library(knitr); knit_rd("SparkR", links = tools::findHTMLlinks(paste(libDir, 
"SparkR", sep="/")))'
 
 popd
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ec5d09c0/R/pkg/R/column.R
--
diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R
index 5a07ebd..a1f50c3 100644
--- a/R/pkg/R/column.R
+++ b/R/pkg/R/column.R
@@ -169,8 +169,7 @@ setMethod("between", signature(x = "Column"),
 #'
 #' @rdname column
 #'
-#' @examples
-#' \dontrun{
+#' @examples \dontrun{
 #'   cast(df$age, "string")
 #'   cast(df$name, list(type="array", elementType="byte", containsNull = TRUE))
 #' }
@@ -192,7 +191,7 @@ setMethod("cast",
 #'
 #' @rdname column
 #' @return a matched values as a result of comparing with given values.
-#' \dontrun{
+#' @examples \dontrun{
 #'   filter(df, "age in (10, 30)")
 #'   where(df, df$age %in% c(10, 30))
 #' }

http://git-wip-us.apache.org/repos/asf/spark/blob/ec5d09c0/R/pkg/R/functions.R
--
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index b5879bd..d848730 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -18,69 +18,1298 @@
 #' @include generics.R column.R
 NULL
 
-#' @title S4 expression functions for DataFrame column(s)
-#' @description These are expression functions on DataFrame columns
-
-functions1 <- c(
-  "abs", "acos", "approxCountDistinct", "ascii", "asin", "atan",
-  "avg", "base64", "bin", "bitwiseNOT", "cbrt", "ceil", "cos", "cosh", "count",
-  "crc32", "dayofmonth", "dayofyear", "exp", "explode", "expm1", "factorial",
-  "first", "floor", "hex", "hour", "initcap", "isNaN", "last", "last_day",
-  "length", "log", "log10", "log1p", "log2", "lower", "ltrim", "max", "md5",
-  "mean", "min", "minute", "month", "negate", "quarter", "reverse",
-  "rint", "round", "rtrim", "second", "sha1", "signum", "sin", "sinh", "size",
-  "soundex", "sqrt", "sum", "sumDistinct", "tan", "tanh", "toDegrees",
-  "toRadians", "to_date", "trim", "unbase64", "unhex", "upper", "weekofyear",
-  "year")
-functions2 <- c(
-  "atan2", "datediff", "hypot", "levenshtein", "months_between", "nanvl", 
"pmod")
-
-createFunction1 <- function(name) {
-  setMethod(name,
-signature(x = "Column"),
-function(x) {
-  jc <- callJStatic("org.apache.spark.sql.functions", name, x@jc)
-  column(jc)
-})
-}
-
-createFunction2 <- function(name) {
-  setMethod(name,
-signature(y = "Column"),
-function(y, x) {
-  if (class(x) == "Column") {
-x <- x@jc
-  }
-  jc <- callJStatic("org.apache.spark.sql.functions", name, y@jc, 
x)
-  column(jc)
-})
-}
+#' Creates a \code{Column} of literal value.
+#'
+#' The pass

spark git commit: [SQL] [MINOR] [DOC] Clarify docs for inferring DataFrame from RDD of Products

2015-08-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 ec5d09c0f -> 2f7e4b416


[SQL] [MINOR] [DOC] Clarify docs for inferring DataFrame from RDD of Products

 * Makes `SQLImplicits.rddToDataFrameHolder` scaladoc consistent with 
`SQLContext.createDataFrame[A <: Product](rdd: RDD[A])` since the former is 
essentially a wrapper for the latter
 * Clarifies `createDataFrame[A <: Product]` scaladoc to apply for any 
`RDD[Product]`, not just case classes

Author: Feynman Liang 

Closes #8406 from feynmanliang/sql-doc-fixes.

(cherry picked from commit 642c43c81c835139e3f35dfd6a215d668a474203)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f7e4b41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f7e4b41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f7e4b41

Branch: refs/heads/branch-1.5
Commit: 2f7e4b416492ff3c2ea7fcab05b57ed9f0c6e45b
Parents: ec5d09c
Author: Feynman Liang 
Authored: Mon Aug 24 19:45:41 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 24 19:45:48 2015 -0700

--
 sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala   | 2 +-
 sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2f7e4b41/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 126c9c6..a1eea09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -350,7 +350,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /**
* :: Experimental ::
-   * Creates a DataFrame from an RDD of case classes.
+   * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
*
* @group dataframes
* @since 1.3.0

http://git-wip-us.apache.org/repos/asf/spark/blob/2f7e4b41/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index 47b6f80..bf03c61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -40,7 +40,7 @@ private[sql] abstract class SQLImplicits {
   implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name)
 
   /**
-   * Creates a DataFrame from an RDD of case classes or tuples.
+   * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
* @since 1.3.0
*/
   implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): 
DataFrameHolder = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SQL] [MINOR] [DOC] Clarify docs for inferring DataFrame from RDD of Products

2015-08-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 6511bf559 -> 642c43c81


[SQL] [MINOR] [DOC] Clarify docs for inferring DataFrame from RDD of Products

 * Makes `SQLImplicits.rddToDataFrameHolder` scaladoc consistent with 
`SQLContext.createDataFrame[A <: Product](rdd: RDD[A])` since the former is 
essentially a wrapper for the latter
 * Clarifies `createDataFrame[A <: Product]` scaladoc to apply for any 
`RDD[Product]`, not just case classes

Author: Feynman Liang 

Closes #8406 from feynmanliang/sql-doc-fixes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/642c43c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/642c43c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/642c43c8

Branch: refs/heads/master
Commit: 642c43c81c835139e3f35dfd6a215d668a474203
Parents: 6511bf5
Author: Feynman Liang 
Authored: Mon Aug 24 19:45:41 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 24 19:45:41 2015 -0700

--
 sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala   | 2 +-
 sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/642c43c8/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 126c9c6..a1eea09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -350,7 +350,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /**
* :: Experimental ::
-   * Creates a DataFrame from an RDD of case classes.
+   * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
*
* @group dataframes
* @since 1.3.0

http://git-wip-us.apache.org/repos/asf/spark/blob/642c43c8/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
index 47b6f80..bf03c61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
@@ -40,7 +40,7 @@ private[sql] abstract class SQLImplicits {
   implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name)
 
   /**
-   * Creates a DataFrame from an RDD of case classes or tuples.
+   * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
* @since 1.3.0
*/
   implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): 
DataFrameHolder = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10121] [SQL] Thrift server always use the latest class loader provided by the conf of executionHive's state

2015-08-24 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 642c43c81 -> a0c0aae1d


[SPARK-10121] [SQL] Thrift server always use the latest class loader provided 
by the conf of executionHive's state

https://issues.apache.org/jira/browse/SPARK-10121

Looks like the problem is that if we add a jar through another thread, the 
thread handling the JDBC session will not get the latest classloader.

Author: Yin Huai 

Closes #8368 from yhuai/SPARK-10121.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0c0aae1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0c0aae1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0c0aae1

Branch: refs/heads/master
Commit: a0c0aae1defe5e1e57704065631d201f8e3f6bac
Parents: 642c43c
Author: Yin Huai 
Authored: Tue Aug 25 12:49:50 2015 +0800
Committer: Cheng Lian 
Committed: Tue Aug 25 12:49:50 2015 +0800

--
 .../SparkExecuteStatementOperation.scala|  6 +++
 .../thriftserver/HiveThriftServer2Suites.scala  | 54 
 2 files changed, 60 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a0c0aae1/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 833bf62..02cc7e5 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -159,6 +159,12 @@ private[hive] class SparkExecuteStatementOperation(
 
   // User information is part of the metastore client member in 
Hive
   hiveContext.setSession(currentSqlSession)
+  // Always use the latest class loader provided by 
executionHive's state.
+  val executionHiveClassLoader =
+hiveContext.executionHive.state.getConf.getClassLoader
+  sessionHive.getConf.setClassLoader(executionHiveClassLoader)
+  
parentSessionState.getConf.setClassLoader(executionHiveClassLoader)
+
   Hive.set(sessionHive)
   SessionState.setCurrentSessionState(parentSessionState)
   try {

http://git-wip-us.apache.org/repos/asf/spark/blob/a0c0aae1/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
--
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index ded42bc..b72249b 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -377,6 +377,60 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
   rs2.close()
 }
   }
+
+  test("test add jar") {
+withMultipleConnectionJdbcStatement(
+  {
+statement =>
+  val jarFile =
+"../hive/src/test/resources/hive-hcatalog-core-0.13.1.jar"
+  .split("/")
+  .mkString(File.separator)
+
+  statement.executeQuery(s"ADD JAR $jarFile")
+  },
+
+  {
+statement =>
+  val queries = Seq(
+"DROP TABLE IF EXISTS smallKV",
+"CREATE TABLE smallKV(key INT, val STRING)",
+s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO 
TABLE smallKV",
+"DROP TABLE IF EXISTS addJar",
+"""CREATE TABLE addJar(key string)
+  |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
+""".stripMargin)
+
+  queries.foreach(statement.execute)
+
+  statement.executeQuery(
+"""
+  |INSERT INTO TABLE addJar SELECT 'k1' as key FROM smallKV limit 1
+""".stripMargin)
+
+  val actualResult =
+statement.executeQuery("SELECT key FROM addJar")
+  val actualResultBuffer = new collection.mutable.ArrayBuffer[String]()
+  while (actualResult.next()) {
+actualResultBuffer += actualResult.getString(1)
+  }
+  actualResult.close()
+
+  val expectedResult =
+statement.executeQuery("SELECT 'k1'")
+  val expectedResultBuffer = ne

spark git commit: [SPARK-10121] [SQL] Thrift server always use the latest class loader provided by the conf of executionHive's state

2015-08-24 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 2f7e4b416 -> c99f4160b


[SPARK-10121] [SQL] Thrift server always use the latest class loader provided 
by the conf of executionHive's state

https://issues.apache.org/jira/browse/SPARK-10121

Looks like the problem is that if we add a jar through another thread, the 
thread handling the JDBC session will not get the latest classloader.

Author: Yin Huai 

Closes #8368 from yhuai/SPARK-10121.

(cherry picked from commit a0c0aae1defe5e1e57704065631d201f8e3f6bac)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c99f4160
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c99f4160
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c99f4160

Branch: refs/heads/branch-1.5
Commit: c99f4160b98bc0685c23fee4eb7b892c47f6feda
Parents: 2f7e4b4
Author: Yin Huai 
Authored: Tue Aug 25 12:49:50 2015 +0800
Committer: Cheng Lian 
Committed: Tue Aug 25 12:50:44 2015 +0800

--
 .../SparkExecuteStatementOperation.scala|  6 +++
 .../thriftserver/HiveThriftServer2Suites.scala  | 54 
 2 files changed, 60 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c99f4160/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 833bf62..02cc7e5 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -159,6 +159,12 @@ private[hive] class SparkExecuteStatementOperation(
 
   // User information is part of the metastore client member in 
Hive
   hiveContext.setSession(currentSqlSession)
+  // Always use the latest class loader provided by 
executionHive's state.
+  val executionHiveClassLoader =
+hiveContext.executionHive.state.getConf.getClassLoader
+  sessionHive.getConf.setClassLoader(executionHiveClassLoader)
+  
parentSessionState.getConf.setClassLoader(executionHiveClassLoader)
+
   Hive.set(sessionHive)
   SessionState.setCurrentSessionState(parentSessionState)
   try {

http://git-wip-us.apache.org/repos/asf/spark/blob/c99f4160/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
--
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index ded42bc..b72249b 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -377,6 +377,60 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
   rs2.close()
 }
   }
+
+  test("test add jar") {
+withMultipleConnectionJdbcStatement(
+  {
+statement =>
+  val jarFile =
+"../hive/src/test/resources/hive-hcatalog-core-0.13.1.jar"
+  .split("/")
+  .mkString(File.separator)
+
+  statement.executeQuery(s"ADD JAR $jarFile")
+  },
+
+  {
+statement =>
+  val queries = Seq(
+"DROP TABLE IF EXISTS smallKV",
+"CREATE TABLE smallKV(key INT, val STRING)",
+s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO 
TABLE smallKV",
+"DROP TABLE IF EXISTS addJar",
+"""CREATE TABLE addJar(key string)
+  |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
+""".stripMargin)
+
+  queries.foreach(statement.execute)
+
+  statement.executeQuery(
+"""
+  |INSERT INTO TABLE addJar SELECT 'k1' as key FROM smallKV limit 1
+""".stripMargin)
+
+  val actualResult =
+statement.executeQuery("SELECT key FROM addJar")
+  val actualResultBuffer = new collection.mutable.ArrayBuffer[String]()
+  while (actualResult.next()) {
+actualResultBuffer += actualResult.getString(1)
+  }
+  actualResult.close()
+
+  val exp

spark git commit: [SPARK-10178] [SQL] HiveComparisionTest should print out dependent tables

2015-08-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master a0c0aae1d -> 5175ca0c8


[SPARK-10178] [SQL] HiveComparisionTest should print out dependent tables

In `HiveComparisionTest`s it is possible to fail a query of the form `SELECT * 
FROM dest1`, where `dest1` is the query that is actually computing the 
incorrect results.  To aid debugging this patch improves the harness to also 
print these query plans and their results.

Author: Michael Armbrust 

Closes #8388 from marmbrus/generatedTables.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5175ca0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5175ca0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5175ca0c

Branch: refs/heads/master
Commit: 5175ca0c85b10045d12c3fb57b1e52278a413ecf
Parents: a0c0aae
Author: Michael Armbrust 
Authored: Mon Aug 24 23:15:27 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 24 23:15:27 2015 -0700

--
 .../sql/hive/execution/HiveComparisonTest.scala | 36 
 1 file changed, 36 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5175ca0c/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 2bdb0e1..4d45249 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution
 
 import java.io._
 
+import scala.util.control.NonFatal
+
 import org.scalatest.{BeforeAndAfterAll, GivenWhenThen}
 
 import org.apache.spark.{Logging, SparkFunSuite}
@@ -386,11 +388,45 @@ abstract class HiveComparisonTest
 hiveCacheFiles.foreach(_.delete())
   }
 
+  // If this query is reading other tables that were created 
during this test run
+  // also print out the query plans and results for those.
+  val computedTablesMessages: String = try {
+val tablesRead = new 
TestHive.QueryExecution(query).executedPlan.collect {
+  case ts: HiveTableScan => ts.relation.tableName
+}.toSet
+
+TestHive.reset()
+val executions = queryList.map(new TestHive.QueryExecution(_))
+executions.foreach(_.toRdd)
+val tablesGenerated = queryList.zip(executions).flatMap {
+  case (q, e) => e.executedPlan.collect {
+case i: InsertIntoHiveTable if tablesRead contains 
i.table.tableName =>
+  (q, e, i)
+  }
+}
+
+tablesGenerated.map { case (hiveql, execution, insert) =>
+  s"""
+ |=== Generated Table ===
+ |$hiveql
+ |$execution
+ |== Results ==
+ |${insert.child.execute().collect().mkString("\n")}
+   """.stripMargin
+}.mkString("\n")
+
+  } catch {
+case NonFatal(e) =>
+  logError("Failed to compute generated tables", e)
+  s"Couldn't compute dependent tables: $e"
+  }
+
   val errorMessage =
 s"""
   |Results do not match for $testCaseName:
   
|$hiveQuery\n${hiveQuery.analyzed.output.map(_.name).mkString("\t")}
   |$resultComparison
+  |$computedTablesMessages
 """.stripMargin
 
   stringToFile(new File(wrongDirectory, testCaseName), 
errorMessage + consoleTestCase)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10178] [SQL] HiveComparisionTest should print out dependent tables

2015-08-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 c99f4160b -> 2239a2036


[SPARK-10178] [SQL] HiveComparisionTest should print out dependent tables

In `HiveComparisionTest`s it is possible to fail a query of the form `SELECT * 
FROM dest1`, where `dest1` is the query that is actually computing the 
incorrect results.  To aid debugging this patch improves the harness to also 
print these query plans and their results.

Author: Michael Armbrust 

Closes #8388 from marmbrus/generatedTables.

(cherry picked from commit 5175ca0c85b10045d12c3fb57b1e52278a413ecf)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2239a203
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2239a203
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2239a203

Branch: refs/heads/branch-1.5
Commit: 2239a20368b7833ffe0059941478924c7be87bbe
Parents: c99f416
Author: Michael Armbrust 
Authored: Mon Aug 24 23:15:27 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 24 23:15:34 2015 -0700

--
 .../sql/hive/execution/HiveComparisonTest.scala | 36 
 1 file changed, 36 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2239a203/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 2bdb0e1..4d45249 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution
 
 import java.io._
 
+import scala.util.control.NonFatal
+
 import org.scalatest.{BeforeAndAfterAll, GivenWhenThen}
 
 import org.apache.spark.{Logging, SparkFunSuite}
@@ -386,11 +388,45 @@ abstract class HiveComparisonTest
 hiveCacheFiles.foreach(_.delete())
   }
 
+  // If this query is reading other tables that were created 
during this test run
+  // also print out the query plans and results for those.
+  val computedTablesMessages: String = try {
+val tablesRead = new 
TestHive.QueryExecution(query).executedPlan.collect {
+  case ts: HiveTableScan => ts.relation.tableName
+}.toSet
+
+TestHive.reset()
+val executions = queryList.map(new TestHive.QueryExecution(_))
+executions.foreach(_.toRdd)
+val tablesGenerated = queryList.zip(executions).flatMap {
+  case (q, e) => e.executedPlan.collect {
+case i: InsertIntoHiveTable if tablesRead contains 
i.table.tableName =>
+  (q, e, i)
+  }
+}
+
+tablesGenerated.map { case (hiveql, execution, insert) =>
+  s"""
+ |=== Generated Table ===
+ |$hiveql
+ |$execution
+ |== Results ==
+ |${insert.child.execute().collect().mkString("\n")}
+   """.stripMargin
+}.mkString("\n")
+
+  } catch {
+case NonFatal(e) =>
+  logError("Failed to compute generated tables", e)
+  s"Couldn't compute dependent tables: $e"
+  }
+
   val errorMessage =
 s"""
   |Results do not match for $testCaseName:
   
|$hiveQuery\n${hiveQuery.analyzed.output.map(_.name).mkString("\t")}
   |$resultComparison
+  |$computedTablesMessages
 """.stripMargin
 
   stringToFile(new File(wrongDirectory, testCaseName), 
errorMessage + consoleTestCase)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…

2015-08-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 5175ca0c8 -> d9c25dec8


[SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…

…ult maxRatePerPartition setting of 0

Author: cody koeninger 

Closes #8413 from koeninger/backpressure-testing-master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9c25dec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9c25dec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9c25dec

Branch: refs/heads/master
Commit: d9c25dec87e6da7d66a47ff94e7eefa008081b9d
Parents: 5175ca0
Author: cody koeninger 
Authored: Mon Aug 24 23:26:14 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 24 23:26:14 2015 -0700

--
 .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d9c25dec/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a17707..194 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -95,8 +95,13 @@ class DirectKafkaInputDStream[
 
 val effectiveRateLimitPerPartition = estimatedRateLimit
   .filter(_ > 0)
-  .map(limit => Math.min(maxRateLimitPerPartition, (limit / 
numPartitions)))
-  .getOrElse(maxRateLimitPerPartition)
+  .map { limit =>
+if (maxRateLimitPerPartition > 0) {
+  Math.min(maxRateLimitPerPartition, (limit / numPartitions))
+} else {
+  limit / numPartitions
+}
+  }.getOrElse(maxRateLimitPerPartition)
 
 if (effectiveRateLimitPerPartition > 0) {
   val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
1000


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…

2015-08-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 2239a2036 -> 88991dc4f


[SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…

…ult maxRatePerPartition setting of 0

Author: cody koeninger 

Closes #8413 from koeninger/backpressure-testing-master.

(cherry picked from commit d9c25dec87e6da7d66a47ff94e7eefa008081b9d)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88991dc4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88991dc4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88991dc4

Branch: refs/heads/branch-1.5
Commit: 88991dc4f04b0c88466c6eab5ada43506c981341
Parents: 2239a20
Author: cody koeninger 
Authored: Mon Aug 24 23:26:14 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 24 23:26:27 2015 -0700

--
 .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/88991dc4/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a17707..194 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -95,8 +95,13 @@ class DirectKafkaInputDStream[
 
 val effectiveRateLimitPerPartition = estimatedRateLimit
   .filter(_ > 0)
-  .map(limit => Math.min(maxRateLimitPerPartition, (limit / 
numPartitions)))
-  .getOrElse(maxRateLimitPerPartition)
+  .map { limit =>
+if (maxRateLimitPerPartition > 0) {
+  Math.min(maxRateLimitPerPartition, (limit / numPartitions))
+} else {
+  limit / numPartitions
+}
+  }.getOrElse(maxRateLimitPerPartition)
 
 if (effectiveRateLimitPerPartition > 0) {
   val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
1000


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers returns balanced results

2015-08-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 88991dc4f -> bb1357f36


[SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers 
returns balanced results

This PR fixes the following cases for `ReceiverSchedulingPolicy`.

1) Assume there are 4 executors: host1, host2, host3, host4, and 5 receivers: 
r1, r2, r3, r4, r5. Then `ReceiverSchedulingPolicy.scheduleReceivers` will 
return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1).
Let's assume r1 starts at first on `host1` as `scheduleReceivers` suggested,  
and try to register with ReceiverTracker. But the previous 
`ReceiverSchedulingPolicy.rescheduleReceiver` will return (host2, host3, host4) 
according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 
0.5, host4 -> 0.5), so ReceiverTracker will reject `r1`. This is unexpected 
since r1 is starting exactly where `scheduleReceivers` suggested.

This case can be fixed by ignoring the information of the receiver that is 
rescheduling in `receiverTrackingInfoMap`.

2) Assume there are 3 executors (host1, host2, host3) and each executors has 3 
cores, and 3 receivers: r1, r2, r3. Assume r1 is running on host1. Now r2 is 
restarting, the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will 
always return (host1, host2, host3). So it's possible that r2 will be scheduled 
to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that 
there are 3 receivers running on host1, while host2 and host3 are idle.

This issue can be fixed by returning only executors that have the minimum wight 
rather than returning at least 3 executors.

Author: zsxwing 

Closes #8340 from zsxwing/fix-receiver-scheduling.

(cherry picked from commit f023aa2fcc1d1dbb82aee568be0a8f2457c309ae)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb1357f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb1357f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb1357f3

Branch: refs/heads/branch-1.5
Commit: bb1357f362cdd96b854c2a0a193496ce709cdbdd
Parents: 88991dc
Author: zsxwing 
Authored: Mon Aug 24 23:34:50 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 24 23:35:02 2015 -0700

--
 .../scheduler/ReceiverSchedulingPolicy.scala|  58 +++---
 .../streaming/scheduler/ReceiverTracker.scala   | 106 ---
 .../ReceiverSchedulingPolicySuite.scala |  13 +--
 3 files changed, 120 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bb1357f3/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index ef5b687..10b5a7f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -22,6 +22,36 @@ import scala.collection.mutable
 
 import org.apache.spark.streaming.receiver.Receiver
 
+/**
+ * A class that tries to schedule receivers with evenly distributed. There are 
two phases for
+ * scheduling receivers.
+ *
+ * - The first phase is global scheduling when ReceiverTracker is starting and 
we need to schedule
+ *   all receivers at the same time. ReceiverTracker will call 
`scheduleReceivers` at this phase.
+ *   It will try to schedule receivers with evenly distributed. 
ReceiverTracker should update its
+ *   receiverTrackingInfoMap according to the results of `scheduleReceivers`.
+ *   `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to 
an executor list that
+ *   contains the scheduled locations. Then when a receiver is starting, it 
will send a register
+ *   request and `ReceiverTracker.registerReceiver` will be called. In
+ *   `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors 
is set, it should check
+ *   if the location of this receiver is one of the scheduled executors, if 
not, the register will
+ *   be rejected.
+ * - The second phase is local scheduling when a receiver is restarting. There 
are two cases of
+ *   receiver restarting:
+ *   - If a receiver is restarting because it's rejected due to the real 
location and the scheduled
+ * executors mismatching, in other words, it fails to start in one of the 
locations that
+ * `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose 
the executors that are
+ * still alive in the list of scheduled executors, then use them to launch 
the receiver job.
+ *   - If a rec

spark git commit: [SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers returns balanced results

2015-08-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master d9c25dec8 -> f023aa2fc


[SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers 
returns balanced results

This PR fixes the following cases for `ReceiverSchedulingPolicy`.

1) Assume there are 4 executors: host1, host2, host3, host4, and 5 receivers: 
r1, r2, r3, r4, r5. Then `ReceiverSchedulingPolicy.scheduleReceivers` will 
return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1).
Let's assume r1 starts at first on `host1` as `scheduleReceivers` suggested,  
and try to register with ReceiverTracker. But the previous 
`ReceiverSchedulingPolicy.rescheduleReceiver` will return (host2, host3, host4) 
according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 
0.5, host4 -> 0.5), so ReceiverTracker will reject `r1`. This is unexpected 
since r1 is starting exactly where `scheduleReceivers` suggested.

This case can be fixed by ignoring the information of the receiver that is 
rescheduling in `receiverTrackingInfoMap`.

2) Assume there are 3 executors (host1, host2, host3) and each executors has 3 
cores, and 3 receivers: r1, r2, r3. Assume r1 is running on host1. Now r2 is 
restarting, the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will 
always return (host1, host2, host3). So it's possible that r2 will be scheduled 
to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that 
there are 3 receivers running on host1, while host2 and host3 are idle.

This issue can be fixed by returning only executors that have the minimum wight 
rather than returning at least 3 executors.

Author: zsxwing 

Closes #8340 from zsxwing/fix-receiver-scheduling.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f023aa2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f023aa2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f023aa2f

Branch: refs/heads/master
Commit: f023aa2fcc1d1dbb82aee568be0a8f2457c309ae
Parents: d9c25de
Author: zsxwing 
Authored: Mon Aug 24 23:34:50 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 24 23:34:50 2015 -0700

--
 .../scheduler/ReceiverSchedulingPolicy.scala|  58 +++---
 .../streaming/scheduler/ReceiverTracker.scala   | 106 ---
 .../ReceiverSchedulingPolicySuite.scala |  13 +--
 3 files changed, 120 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f023aa2f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index ef5b687..10b5a7f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -22,6 +22,36 @@ import scala.collection.mutable
 
 import org.apache.spark.streaming.receiver.Receiver
 
+/**
+ * A class that tries to schedule receivers with evenly distributed. There are 
two phases for
+ * scheduling receivers.
+ *
+ * - The first phase is global scheduling when ReceiverTracker is starting and 
we need to schedule
+ *   all receivers at the same time. ReceiverTracker will call 
`scheduleReceivers` at this phase.
+ *   It will try to schedule receivers with evenly distributed. 
ReceiverTracker should update its
+ *   receiverTrackingInfoMap according to the results of `scheduleReceivers`.
+ *   `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to 
an executor list that
+ *   contains the scheduled locations. Then when a receiver is starting, it 
will send a register
+ *   request and `ReceiverTracker.registerReceiver` will be called. In
+ *   `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors 
is set, it should check
+ *   if the location of this receiver is one of the scheduled executors, if 
not, the register will
+ *   be rejected.
+ * - The second phase is local scheduling when a receiver is restarting. There 
are two cases of
+ *   receiver restarting:
+ *   - If a receiver is restarting because it's rejected due to the real 
location and the scheduled
+ * executors mismatching, in other words, it fails to start in one of the 
locations that
+ * `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose 
the executors that are
+ * still alive in the list of scheduled executors, then use them to launch 
the receiver job.
+ *   - If a receiver is restarting without a scheduled executors list, or the 
executors in the list
+ * are dead, `Rec

spark git commit: [SPARK-10196] [SQL] Correctly saving decimals in internal rows to JSON.

2015-08-24 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master f023aa2fc -> df7041d02


[SPARK-10196] [SQL] Correctly saving decimals in internal rows to JSON.

https://issues.apache.org/jira/browse/SPARK-10196

Author: Yin Huai 

Closes #8408 from yhuai/DecimalJsonSPARK-10196.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df7041d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df7041d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df7041d0

Branch: refs/heads/master
Commit: df7041d02d3fd44b08a859f5d77bf6fb726895f0
Parents: f023aa2
Author: Yin Huai 
Authored: Mon Aug 24 23:38:32 2015 -0700
Committer: Davies Liu 
Committed: Mon Aug 24 23:38:32 2015 -0700

--
 .../datasources/json/JacksonGenerator.scala |  2 +-
 .../sql/sources/JsonHadoopFsRelationSuite.scala | 27 
 2 files changed, 28 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/df7041d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 99ac773..330ba90 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -95,7 +95,7 @@ private[sql] object JacksonGenerator {
   case (FloatType, v: Float) => gen.writeNumber(v)
   case (DoubleType, v: Double) => gen.writeNumber(v)
   case (LongType, v: Long) => gen.writeNumber(v)
-  case (DecimalType(), v: java.math.BigDecimal) => gen.writeNumber(v)
+  case (DecimalType(), v: Decimal) => gen.writeNumber(v.toJavaBigDecimal)
   case (ByteType, v: Byte) => gen.writeNumber(v.toInt)
   case (BinaryType, v: Array[Byte]) => gen.writeBinary(v)
   case (BooleanType, v: Boolean) => gen.writeBoolean(v)

http://git-wip-us.apache.org/repos/asf/spark/blob/df7041d0/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
index ed6d512..8ca3a17 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.sources
 
+import java.math.BigDecimal
+
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -75,4 +77,29 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest 
{
   )
 }
   }
+
+  test("SPARK-10196: save decimal type to JSON") {
+withTempDir { file =>
+  file.delete()
+
+  val schema =
+new StructType()
+  .add("decimal", DecimalType(7, 2))
+
+  val data =
+Row(new BigDecimal("10.02")) ::
+  Row(new BigDecimal("2.99")) ::
+  Row(new BigDecimal("1")) :: Nil
+  val df = createDataFrame(sparkContext.parallelize(data), schema)
+
+  // Write the data out.
+  df.write.format(dataSourceName).save(file.getCanonicalPath)
+
+  // Read it back and check the result.
+  checkAnswer(
+read.format(dataSourceName).schema(schema).load(file.getCanonicalPath),
+df
+  )
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10196] [SQL] Correctly saving decimals in internal rows to JSON.

2015-08-24 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 bb1357f36 -> 0b425ed3d


[SPARK-10196] [SQL] Correctly saving decimals in internal rows to JSON.

https://issues.apache.org/jira/browse/SPARK-10196

Author: Yin Huai 

Closes #8408 from yhuai/DecimalJsonSPARK-10196.

(cherry picked from commit df7041d02d3fd44b08a859f5d77bf6fb726895f0)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b425ed3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b425ed3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b425ed3

Branch: refs/heads/branch-1.5
Commit: 0b425ed3d55f7e9c39a259ce4b8d86a41a7bd403
Parents: bb1357f
Author: Yin Huai 
Authored: Mon Aug 24 23:38:32 2015 -0700
Committer: Davies Liu 
Committed: Mon Aug 24 23:38:42 2015 -0700

--
 .../datasources/json/JacksonGenerator.scala |  2 +-
 .../sql/sources/JsonHadoopFsRelationSuite.scala | 27 
 2 files changed, 28 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0b425ed3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 99ac773..330ba90 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -95,7 +95,7 @@ private[sql] object JacksonGenerator {
   case (FloatType, v: Float) => gen.writeNumber(v)
   case (DoubleType, v: Double) => gen.writeNumber(v)
   case (LongType, v: Long) => gen.writeNumber(v)
-  case (DecimalType(), v: java.math.BigDecimal) => gen.writeNumber(v)
+  case (DecimalType(), v: Decimal) => gen.writeNumber(v.toJavaBigDecimal)
   case (ByteType, v: Byte) => gen.writeNumber(v.toInt)
   case (BinaryType, v: Array[Byte]) => gen.writeBinary(v)
   case (BooleanType, v: Boolean) => gen.writeBoolean(v)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b425ed3/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
index ed6d512..8ca3a17 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.sources
 
+import java.math.BigDecimal
+
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -75,4 +77,29 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest 
{
   )
 }
   }
+
+  test("SPARK-10196: save decimal type to JSON") {
+withTempDir { file =>
+  file.delete()
+
+  val schema =
+new StructType()
+  .add("decimal", DecimalType(7, 2))
+
+  val data =
+Row(new BigDecimal("10.02")) ::
+  Row(new BigDecimal("2.99")) ::
+  Row(new BigDecimal("1")) :: Nil
+  val df = createDataFrame(sparkContext.parallelize(data), schema)
+
+  // Write the data out.
+  df.write.format(dataSourceName).save(file.getCanonicalPath)
+
+  // Read it back and check the result.
+  checkAnswer(
+read.format(dataSourceName).schema(schema).load(file.getCanonicalPath),
+df
+  )
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10136] [SQL] A more robust fix for SPARK-10136

2015-08-24 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master df7041d02 -> bf03fe68d


[SPARK-10136] [SQL] A more robust fix for SPARK-10136

PR #8341 is a valid fix for SPARK-10136, but it didn't catch the real root 
cause.  The real problem can be rather tricky to explain, and requires 
audiences to be pretty familiar with parquet-format spec, especially details of 
`LIST` backwards-compatibility rules.  Let me have a try to give an explanation 
here.

The structure of the problematic Parquet schema generated by parquet-avro is 
something like this:

```
message m {
   group f (LIST) { // Level 1
repeated group array (LIST) {   // Level 2
  repeated  array;  // Level 3
}
  }
}
```

(The schema generated by parquet-thrift is structurally similar, just replace 
the `array` at level 2 with `f_tuple`, and the other one at level 3 with 
`f_tuple_tuple`.)

This structure consists of two nested legacy 2-level `LIST`-like structures:

1. The repeated group type at level 2 is the element type of the outer array 
defined at level 1

   This group should map to an `CatalystArrayConverter.ElementConverter` when 
building converters.

2. The repeated primitive type at level 3 is the element type of the inner 
array defined at level 2

   This group should also map to an `CatalystArrayConverter.ElementConverter`.

The root cause of SPARK-10136 is that, the group at level 2 isn't properly 
recognized as the element type of level 1.  Thus, according to parquet-format 
spec, the repeated primitive at level 3 is left as a so called "unannotated 
repeated primitive type", and is recognized as a required list of required 
primitive type, thus a `RepeatedPrimitiveConverter` instead of a 
`CatalystArrayConverter.ElementConverter` is created for it.

According to  parquet-format spec, unannotated repeated type shouldn't appear 
in a `LIST`- or `MAP`-annotated group.  PR #8341 fixed this issue by allowing 
such unannotated repeated type appear in `LIST`-annotated groups, which is a 
non-standard, hacky, but valid fix.  (I didn't realize this when authoring 
#8341 though.)

As for the reason why level 2 isn't recognized as a list element type, it's 
because of the following `LIST` backwards-compatibility rule defined in the 
parquet-format spec:

> If the repeated field is a group with one field and is named either `array` 
> or uses the `LIST`-annotated group's name with `_tuple` appended then the 
> repeated type is the element type and elements are required.

(The `array` part is for parquet-avro compatibility, while the `_tuple` part is 
for parquet-thrift.)

This rule is implemented in [`CatalystSchemaConverter.isElementType`] [1], but 
neglected in [`CatalystRowConverter.isElementType`] [2].  This PR delivers a 
more robust fix by adding this rule in the latter method.

Note that parquet-avro 1.7.0 also suffers from this issue. Details can be found 
at [PARQUET-364] [3].

[1]: 
https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala#L259-L305
[2]: 
https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala#L456-L463
[3]: https://issues.apache.org/jira/browse/PARQUET-364

Author: Cheng Lian 

Closes #8361 from liancheng/spark-10136/proper-version.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf03fe68
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf03fe68
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf03fe68

Branch: refs/heads/master
Commit: bf03fe68d62f33dda70dff45c3bda1f57b032dfc
Parents: df7041d
Author: Cheng Lian 
Authored: Tue Aug 25 14:58:42 2015 +0800
Committer: Cheng Lian 
Committed: Tue Aug 25 14:58:42 2015 +0800

--
 .../parquet/CatalystRowConverter.scala| 18 --
 1 file changed, 8 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bf03fe68/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index d2c2db5..cbf0704 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -415,8 +415,9 @@ private[parquet] class CatalystRowConverter(
 private val elementConve

spark git commit: [SPARK-10136] [SQL] A more robust fix for SPARK-10136

2015-08-24 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 0b425ed3d -> 95a14e9f2


[SPARK-10136] [SQL] A more robust fix for SPARK-10136

PR #8341 is a valid fix for SPARK-10136, but it didn't catch the real root 
cause.  The real problem can be rather tricky to explain, and requires 
audiences to be pretty familiar with parquet-format spec, especially details of 
`LIST` backwards-compatibility rules.  Let me have a try to give an explanation 
here.

The structure of the problematic Parquet schema generated by parquet-avro is 
something like this:

```
message m {
   group f (LIST) { // Level 1
repeated group array (LIST) {   // Level 2
  repeated  array;  // Level 3
}
  }
}
```

(The schema generated by parquet-thrift is structurally similar, just replace 
the `array` at level 2 with `f_tuple`, and the other one at level 3 with 
`f_tuple_tuple`.)

This structure consists of two nested legacy 2-level `LIST`-like structures:

1. The repeated group type at level 2 is the element type of the outer array 
defined at level 1

   This group should map to an `CatalystArrayConverter.ElementConverter` when 
building converters.

2. The repeated primitive type at level 3 is the element type of the inner 
array defined at level 2

   This group should also map to an `CatalystArrayConverter.ElementConverter`.

The root cause of SPARK-10136 is that, the group at level 2 isn't properly 
recognized as the element type of level 1.  Thus, according to parquet-format 
spec, the repeated primitive at level 3 is left as a so called "unannotated 
repeated primitive type", and is recognized as a required list of required 
primitive type, thus a `RepeatedPrimitiveConverter` instead of a 
`CatalystArrayConverter.ElementConverter` is created for it.

According to  parquet-format spec, unannotated repeated type shouldn't appear 
in a `LIST`- or `MAP`-annotated group.  PR #8341 fixed this issue by allowing 
such unannotated repeated type appear in `LIST`-annotated groups, which is a 
non-standard, hacky, but valid fix.  (I didn't realize this when authoring 
#8341 though.)

As for the reason why level 2 isn't recognized as a list element type, it's 
because of the following `LIST` backwards-compatibility rule defined in the 
parquet-format spec:

> If the repeated field is a group with one field and is named either `array` 
> or uses the `LIST`-annotated group's name with `_tuple` appended then the 
> repeated type is the element type and elements are required.

(The `array` part is for parquet-avro compatibility, while the `_tuple` part is 
for parquet-thrift.)

This rule is implemented in [`CatalystSchemaConverter.isElementType`] [1], but 
neglected in [`CatalystRowConverter.isElementType`] [2].  This PR delivers a 
more robust fix by adding this rule in the latter method.

Note that parquet-avro 1.7.0 also suffers from this issue. Details can be found 
at [PARQUET-364] [3].

[1]: 
https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala#L259-L305
[2]: 
https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala#L456-L463
[3]: https://issues.apache.org/jira/browse/PARQUET-364

Author: Cheng Lian 

Closes #8361 from liancheng/spark-10136/proper-version.

(cherry picked from commit bf03fe68d62f33dda70dff45c3bda1f57b032dfc)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95a14e9f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95a14e9f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95a14e9f

Branch: refs/heads/branch-1.5
Commit: 95a14e9f232a22548d97c8704d781d089188
Parents: 0b425ed
Author: Cheng Lian 
Authored: Tue Aug 25 14:58:42 2015 +0800
Committer: Cheng Lian 
Committed: Tue Aug 25 14:58:57 2015 +0800

--
 .../parquet/CatalystRowConverter.scala| 18 --
 1 file changed, 8 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/95a14e9f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index d2c2db5..cbf0704 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConve