spark git commit: [SPARK-10168] [STREAMING] Fix the issue that maven publishes wrong artifact jars
Repository: spark Updated Branches: refs/heads/master 053d94fcf -> 4e0395ddb [SPARK-10168] [STREAMING] Fix the issue that maven publishes wrong artifact jars This PR removed the `outputFile` configuration from pom.xml and updated `tests.py` to search jars for both sbt build and maven build. I ran ` mvn -Pkinesis-asl -DskipTests clean install` locally, and verified the jars in my local repository were correct. I also checked Python tests for maven build, and it passed all tests. Author: zsxwing Closes #8373 from zsxwing/SPARK-10168 and squashes the following commits: e0b5818 [zsxwing] Fix the sbt build c697627 [zsxwing] Add the jar pathes to the exception message be1d8a5 [zsxwing] Fix the issue that maven publishes wrong artifact jars Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e0395dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e0395dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e0395dd Branch: refs/heads/master Commit: 4e0395ddb764d092b5b38447af49e196e590e0f0 Parents: 053d94f Author: zsxwing Authored: Mon Aug 24 12:38:01 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 24 12:38:01 2015 -0700 -- external/flume-assembly/pom.xml | 1 - external/kafka-assembly/pom.xml | 1 - external/mqtt-assembly/pom.xml | 1 - extras/kinesis-asl-assembly/pom.xml | 1 - python/pyspark/streaming/tests.py | 47 ++-- 5 files changed, 26 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4e0395dd/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index e05e431..561ed4b 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -115,7 +115,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar *:* http://git-wip-us.apache.org/repos/asf/spark/blob/4e0395dd/external/kafka-assembly/pom.xml -- diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index 36342f3..6f4e2a8 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -142,7 +142,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kafka-assembly-${project.version}.jar *:* http://git-wip-us.apache.org/repos/asf/spark/blob/4e0395dd/external/mqtt-assembly/pom.xml -- diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index f3e3f93..8412600 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -132,7 +132,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar *:* http://git-wip-us.apache.org/repos/asf/spark/blob/4e0395dd/extras/kinesis-asl-assembly/pom.xml -- diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml index 3ca5386..51af3e6 100644 --- a/extras/kinesis-asl-assembly/pom.xml +++ b/extras/kinesis-asl-assembly/pom.xml @@ -137,7 +137,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar *:* http://git-wip-us.apache.org/repos/asf/spark/blob/4e0395dd/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 510a4f2..cfea95b 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1162,11 +1162,20 @@ class KinesisStreamTests(PySparkStreamingTestCase): kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) +# Search jar in the project dir using the jar name_prefix for both sbt build and maven build because +# the artifact jars are in different directories. +def search_jar(dir, name_prefix): +# We should ignore the following jars +ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar") +jars = (glob.glob(os.path.join(dir, "target/scala-*/" + name_prefix + "-*.jar")) + # sbt build +glob.glob(os.p
spark git commit: [SPARK-9791] [PACKAGE] Change private class to private class to prevent unnecessary classes from showing up in the docs
Repository: spark Updated Branches: refs/heads/master 4e0395ddb -> 7478c8b66 [SPARK-9791] [PACKAGE] Change private class to private class to prevent unnecessary classes from showing up in the docs In addition, some random cleanup of import ordering Author: Tathagata Das Closes #8387 from tdas/SPARK-9791 and squashes the following commits: 67f3ee9 [Tathagata Das] Change private class to private[package] class to prevent them from showing up in the docs Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7478c8b6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7478c8b6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7478c8b6 Branch: refs/heads/master Commit: 7478c8b66d6a2b1179f20c38b49e27e37b0caec3 Parents: 4e0395d Author: Tathagata Das Authored: Mon Aug 24 12:40:09 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 24 12:40:09 2015 -0700 -- .../spark/streaming/flume/FlumeUtils.scala | 2 +- .../apache/spark/streaming/kafka/Broker.scala | 6 ++-- .../spark/streaming/kafka/KafkaTestUtils.scala | 10 +++--- .../spark/streaming/kafka/KafkaUtils.scala | 36 +--- .../spark/streaming/kafka/OffsetRange.scala | 8 - .../apache/spark/streaming/mqtt/MQTTUtils.scala | 6 ++-- .../spark/streaming/mqtt/MQTTTestUtils.scala| 2 +- .../streaming/kinesis/KinesisTestUtils.scala| 2 +- .../spark/streaming/util/WriteAheadLog.java | 2 ++ .../util/WriteAheadLogRecordHandle.java | 2 ++ .../receiver/ReceivedBlockHandler.scala | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 2 +- .../apache/spark/streaming/ui/BatchPage.scala | 2 +- 13 files changed, 28 insertions(+), 54 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7478c8b6/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala -- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 095bfb0..a65a9b9 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -247,7 +247,7 @@ object FlumeUtils { * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and * function so that it can be easily instantiated and called from Python's FlumeUtils. */ -private class FlumeUtilsPythonHelper { +private[flume] class FlumeUtilsPythonHelper { def createStream( jssc: JavaStreamingContext, http://git-wip-us.apache.org/repos/asf/spark/blob/7478c8b6/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala index 5a74feb..9159051 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala @@ -20,11 +20,9 @@ package org.apache.spark.streaming.kafka import org.apache.spark.annotation.Experimental /** - * :: Experimental :: - * Represent the host and port info for a Kafka broker. - * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID + * Represents the host and port info for a Kafka broker. + * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID. */ -@Experimental final class Broker private( /** Broker's hostname */ val host: String, http://git-wip-us.apache.org/repos/asf/spark/blob/7478c8b6/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index b608b75..79a9db4 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -20,9 +20,8 @@ package org.apache.spark.streaming.kafka import java.io.File import java.lang.{Integer => JInt} import java.net.InetSocketAddress -import java.util.{Map => JMap} -import java.util.Properties import java.util.concurrent.TimeoutException +import java.util.{Map => JMap, Properties} import scala.annotation.tailrec import scala.language.postfixOps @@
spark git commit: [SPARK-9791] [PACKAGE] Change private class to private class to prevent unnecessary classes from showing up in the docs
Repository: spark Updated Branches: refs/heads/branch-1.5 36bc50c8d -> d003373bd [SPARK-9791] [PACKAGE] Change private class to private class to prevent unnecessary classes from showing up in the docs In addition, some random cleanup of import ordering Author: Tathagata Das Closes #8387 from tdas/SPARK-9791 and squashes the following commits: 67f3ee9 [Tathagata Das] Change private class to private[package] class to prevent them from showing up in the docs (cherry picked from commit 7478c8b66d6a2b1179f20c38b49e27e37b0caec3) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d003373b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d003373b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d003373b Branch: refs/heads/branch-1.5 Commit: d003373bd8557ed255125940f736e44f8722e8e3 Parents: 36bc50c Author: Tathagata Das Authored: Mon Aug 24 12:40:09 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 24 12:40:23 2015 -0700 -- .../spark/streaming/flume/FlumeUtils.scala | 2 +- .../apache/spark/streaming/kafka/Broker.scala | 6 ++-- .../spark/streaming/kafka/KafkaTestUtils.scala | 10 +++--- .../spark/streaming/kafka/KafkaUtils.scala | 36 +--- .../spark/streaming/kafka/OffsetRange.scala | 8 - .../apache/spark/streaming/mqtt/MQTTUtils.scala | 6 ++-- .../spark/streaming/mqtt/MQTTTestUtils.scala| 2 +- .../streaming/kinesis/KinesisTestUtils.scala| 2 +- .../spark/streaming/util/WriteAheadLog.java | 2 ++ .../util/WriteAheadLogRecordHandle.java | 2 ++ .../receiver/ReceivedBlockHandler.scala | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 2 +- .../apache/spark/streaming/ui/BatchPage.scala | 2 +- 13 files changed, 28 insertions(+), 54 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d003373b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala -- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 095bfb0..a65a9b9 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -247,7 +247,7 @@ object FlumeUtils { * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and * function so that it can be easily instantiated and called from Python's FlumeUtils. */ -private class FlumeUtilsPythonHelper { +private[flume] class FlumeUtilsPythonHelper { def createStream( jssc: JavaStreamingContext, http://git-wip-us.apache.org/repos/asf/spark/blob/d003373b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala index 5a74feb..9159051 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala @@ -20,11 +20,9 @@ package org.apache.spark.streaming.kafka import org.apache.spark.annotation.Experimental /** - * :: Experimental :: - * Represent the host and port info for a Kafka broker. - * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID + * Represents the host and port info for a Kafka broker. + * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID. */ -@Experimental final class Broker private( /** Broker's hostname */ val host: String, http://git-wip-us.apache.org/repos/asf/spark/blob/d003373b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index b608b75..79a9db4 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -20,9 +20,8 @@ package org.apache.spark.streaming.kafka import java.io.File import java.lang.{Integer => JInt} import java.net.InetSocketAddress -import java.util.{Map => JMap} -import java.util.Properties import java.util.concurrent.TimeoutException +import
spark git commit: [SPARK-10168] [STREAMING] Fix the issue that maven publishes wrong artifact jars
Repository: spark Updated Branches: refs/heads/branch-1.5 b40059dbd -> 36bc50c8d [SPARK-10168] [STREAMING] Fix the issue that maven publishes wrong artifact jars This PR removed the `outputFile` configuration from pom.xml and updated `tests.py` to search jars for both sbt build and maven build. I ran ` mvn -Pkinesis-asl -DskipTests clean install` locally, and verified the jars in my local repository were correct. I also checked Python tests for maven build, and it passed all tests. Author: zsxwing Closes #8373 from zsxwing/SPARK-10168 and squashes the following commits: e0b5818 [zsxwing] Fix the sbt build c697627 [zsxwing] Add the jar pathes to the exception message be1d8a5 [zsxwing] Fix the issue that maven publishes wrong artifact jars (cherry picked from commit 4e0395ddb764d092b5b38447af49e196e590e0f0) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36bc50c8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36bc50c8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36bc50c8 Branch: refs/heads/branch-1.5 Commit: 36bc50c8d377f3e628f7d608d58a76ea508e9697 Parents: b40059d Author: zsxwing Authored: Mon Aug 24 12:38:01 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 24 12:38:10 2015 -0700 -- external/flume-assembly/pom.xml | 1 - external/kafka-assembly/pom.xml | 1 - external/mqtt-assembly/pom.xml | 1 - extras/kinesis-asl-assembly/pom.xml | 1 - python/pyspark/streaming/tests.py | 47 ++-- 5 files changed, 26 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/36bc50c8/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index e05e431..561ed4b 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -115,7 +115,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar *:* http://git-wip-us.apache.org/repos/asf/spark/blob/36bc50c8/external/kafka-assembly/pom.xml -- diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index 36342f3..6f4e2a8 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -142,7 +142,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kafka-assembly-${project.version}.jar *:* http://git-wip-us.apache.org/repos/asf/spark/blob/36bc50c8/external/mqtt-assembly/pom.xml -- diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index f3e3f93..8412600 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -132,7 +132,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar *:* http://git-wip-us.apache.org/repos/asf/spark/blob/36bc50c8/extras/kinesis-asl-assembly/pom.xml -- diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml index 3ca5386..51af3e6 100644 --- a/extras/kinesis-asl-assembly/pom.xml +++ b/extras/kinesis-asl-assembly/pom.xml @@ -137,7 +137,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar *:* http://git-wip-us.apache.org/repos/asf/spark/blob/36bc50c8/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 510a4f2..cfea95b 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1162,11 +1162,20 @@ class KinesisStreamTests(PySparkStreamingTestCase): kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) +# Search jar in the project dir using the jar name_prefix for both sbt build and maven build because +# the artifact jars are in different directories. +def search_jar(dir, name_prefix): +# We should ignore the following jars +ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar") +jars = (glob.glo
spark git commit: [SPARK-10169] [SQL] [BRANCH-1.3] Partial aggregation's plan is wrong when a grouping expression is used as an argument of the aggregate fucntion
Repository: spark Updated Branches: refs/heads/branch-1.3 a98603f8c -> 3d2eaf0a7 [SPARK-10169] [SQL] [BRANCH-1.3] Partial aggregation's plan is wrong when a grouping expression is used as an argument of the aggregate fucntion https://issues.apache.org/jira/browse/SPARK-10169 Author: Wenchen Fan Author: Yin Huai Closes #8380 from yhuai/aggTransformDown-branch1.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d2eaf0a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d2eaf0a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d2eaf0a Branch: refs/heads/branch-1.3 Commit: 3d2eaf0a7701bfd9a41ba4c1b29e5be77156a9bf Parents: a98603f Author: Wenchen Fan Authored: Mon Aug 24 13:00:49 2015 -0700 Committer: Michael Armbrust Committed: Mon Aug 24 13:00:49 2015 -0700 -- .../spark/sql/catalyst/planning/patterns.scala | 14 +++-- .../org/apache/spark/sql/SQLQuerySuite.scala| 22 2 files changed, 34 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3d2eaf0a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 9c8c643..d0ebe24 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -151,7 +151,10 @@ object PartialAggregation { // Replace aggregations with a new expression that computes the result from the already // computed partial evaluations and grouping values. -val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformUp { +// transformDown is needed at here because we want to match aggregate function first. +// Otherwise, if a grouping expression is used as an argument of an aggregate function, +// we will match grouping expression first and have a wrong plan. +val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformDown { case e: Expression if partialEvaluations.contains(new TreeNodeRef(e)) => partialEvaluations(new TreeNodeRef(e)).finalEvaluation @@ -159,8 +162,15 @@ object PartialAggregation { // Should trim aliases around `GetField`s. These aliases are introduced while // resolving struct field accesses, because `GetField` is not a `NamedExpression`. // (Should we just turn `GetField` into a `NamedExpression`?) +def trimAliases(e: Expression): Expression = + e.transform { case Alias(g: GetField, _) => g } +val trimmed = e match { + // Don't trim the top level Alias. + case Alias(child, name) => Alias(trimAliases(child), name)() + case _ => trimAliases(e) +} namedGroupingExpressions - .get(e.transform { case Alias(g: GetField, _) => g }) + .get(trimmed) .map(_.toAttribute) .getOrElse(e) }).asInstanceOf[Seq[NamedExpression]] http://git-wip-us.apache.org/repos/asf/spark/blob/3d2eaf0a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 87e7cf8..b52b606 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1099,4 +1099,26 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY c0.a"), Row(1)) checkAnswer(sql("SELECT b[0].a FROM t ORDER BY c0.a"), Row(1)) } + + test("SPARK-10169: grouping expressions used as arguments of aggregate functions.") { +sqlCtx.sparkContext + .parallelize((1 to 1000), 50) + .map(i => Tuple1(i)) + .toDF("i") + .registerTempTable("t") + +val query = sqlCtx.sql( + """ +|select i % 10, sum(if(i % 10 = 5, 1, 0)), count(i) +|from t +|where i % 10 = 5 +|group by i % 10 + """.stripMargin) + +checkAnswer( + query, + Row(5, 100, 100)) + +dropTempTable("t") + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10169] [SQL] [BRANCH-1.4] Partial aggregation's plan is wrong when a grouping expression is used as an argument of the aggregate fucntion
Repository: spark Updated Branches: refs/heads/branch-1.4 c73498773 -> 2671551a9 [SPARK-10169] [SQL] [BRANCH-1.4] Partial aggregation's plan is wrong when a grouping expression is used as an argument of the aggregate fucntion https://issues.apache.org/jira/browse/SPARK-10169 Author: Yin Huai Author: Wenchen Fan Closes #8379 from yhuai/aggTransformDown-branch1.4. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2671551a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2671551a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2671551a Branch: refs/heads/branch-1.4 Commit: 2671551a94f203bcfb3d0ab11e551c2f9865f4ea Parents: c734987 Author: Yin Huai Authored: Mon Aug 24 13:02:06 2015 -0700 Committer: Michael Armbrust Committed: Mon Aug 24 13:02:06 2015 -0700 -- .../spark/sql/catalyst/planning/patterns.scala | 13 ++-- .../org/apache/spark/sql/SQLQuerySuite.scala| 22 2 files changed, 33 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2671551a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 1dd75a8..c1b88d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -151,7 +151,10 @@ object PartialAggregation { // Replace aggregations with a new expression that computes the result from the already // computed partial evaluations and grouping values. -val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformUp { +// transformDown is needed at here because we want to match aggregate function first. +// Otherwise, if a grouping expression is used as an argument of an aggregate function, +// we will match grouping expression first and have a wrong plan. +val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformDown { case e: Expression if partialEvaluations.contains(new TreeNodeRef(e)) => partialEvaluations(new TreeNodeRef(e)).finalEvaluation @@ -159,7 +162,13 @@ object PartialAggregation { // Should trim aliases around `GetField`s. These aliases are introduced while // resolving struct field accesses, because `GetField` is not a `NamedExpression`. // (Should we just turn `GetField` into a `NamedExpression`?) -val trimmed = e.transform { case Alias(g: ExtractValue, _) => g } +def trimAliases(e: Expression): Expression = + e.transform { case Alias(g: ExtractValue, _) => g } +val trimmed = e match { + // Don't trim the top level Alias. + case Alias(child, name) => Alias(trimAliases(child), name)() + case _ => trimAliases(e) +} namedGroupingExpressions .find { case (k, v) => k semanticEquals trimmed } .map(_._2.toAttribute) http://git-wip-us.apache.org/repos/asf/spark/blob/2671551a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8a0679e..1067b94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1335,4 +1335,26 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1)) } + + test("SPARK-10169: grouping expressions used as arguments of aggregate functions.") { +sqlCtx.sparkContext + .parallelize((1 to 1000), 50) + .map(i => Tuple1(i)) + .toDF("i") + .registerTempTable("t") + +val query = sqlCtx.sql( + """ +|select i % 10, sum(if(i % 10 = 5, 1, 0)), count(i) +|from t +|where i % 10 = 5 +|group by i % 10 + """.stripMargin) + +checkAnswer( + query, + Row(5, 100, 100)) + +dropTempTable("t") + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7710] [SPARK-7998] [DOCS] Docs for DataFrameStatFunctions
Repository: spark Updated Branches: refs/heads/branch-1.5 d003373bd -> 43dcf95e4 [SPARK-7710] [SPARK-7998] [DOCS] Docs for DataFrameStatFunctions This PR contains examples on how to use some of the Stat Functions available for DataFrames under `df.stat`. rxin Author: Burak Yavuz Closes #8378 from brkyvz/update-sql-docs. (cherry picked from commit 9ce0c7ad333f4a3c01207e5e9ed42bcafb99d894) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43dcf95e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43dcf95e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43dcf95e Branch: refs/heads/branch-1.5 Commit: 43dcf95e42eb77c7cd545179c461bb7f9430e0e3 Parents: d003373 Author: Burak Yavuz Authored: Mon Aug 24 13:48:01 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 24 13:48:09 2015 -0700 -- .../scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../spark/sql/DataFrameStatFunctions.scala | 101 +++ 2 files changed, 102 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/43dcf95e/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5bed299..ae341c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -684,7 +684,7 @@ class DataFrame private[sql]( // make it a NamedExpression. case Column(u: UnresolvedAttribute) => UnresolvedAlias(u) case Column(expr: NamedExpression) => expr - // Leave an unaliased explode with an empty list of names since the analzyer will generate the + // Leave an unaliased explode with an empty list of names since the analyzer will generate the // correct defaults after the nested expression's type has been resolved. case Column(explode: Explode) => MultiAlias(explode, Nil) case Column(expr: Expression) => Alias(expr, expr.prettyString)() http://git-wip-us.apache.org/repos/asf/spark/blob/43dcf95e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 2e68e35..69c9847 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -39,6 +39,13 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param col2 the name of the second column * @return the covariance of the two columns. * + * {{{ + *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", rand(seed=10)) + * .withColumn("rand2", rand(seed=27)) + *df.stat.cov("rand1", "rand2") + *res1: Double = 0.065... + * }}} + * * @since 1.4.0 */ def cov(col1: String, col2: String): Double = { @@ -54,6 +61,13 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param col2 the name of the column to calculate the correlation against * @return The Pearson Correlation Coefficient as a Double. * + * {{{ + *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", rand(seed=10)) + * .withColumn("rand2", rand(seed=27)) + *df.stat.corr("rand1", "rand2") + *res1: Double = 0.613... + * }}} + * * @since 1.4.0 */ def corr(col1: String, col2: String, method: String): Double = { @@ -69,6 +83,13 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param col2 the name of the column to calculate the correlation against * @return The Pearson Correlation Coefficient as a Double. * + * {{{ + *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", rand(seed=10)) + * .withColumn("rand2", rand(seed=27)) + *df.stat.corr("rand1", "rand2", "pearson") + *res1: Double = 0.613... + * }}} + * * @since 1.4.0 */ def corr(col1: String, col2: String): Double = { @@ -92,6 +113,20 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * of the DataFrame. * @return A DataFrame containing for the contingency table. * + * {{{ + *val df = sqlContext.createDataFrame(Seq((1, 1), (1, 2), (2, 1), (2, 1), (2, 3), (3, 2), + * (3, 3))).toDF("key", "value") + *val ct = df.stat.crosstab("key", "value") + *ct.show() + *+-+---+---+---+ + *
spark git commit: [SPARK-7710] [SPARK-7998] [DOCS] Docs for DataFrameStatFunctions
Repository: spark Updated Branches: refs/heads/master 7478c8b66 -> 9ce0c7ad3 [SPARK-7710] [SPARK-7998] [DOCS] Docs for DataFrameStatFunctions This PR contains examples on how to use some of the Stat Functions available for DataFrames under `df.stat`. rxin Author: Burak Yavuz Closes #8378 from brkyvz/update-sql-docs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ce0c7ad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ce0c7ad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ce0c7ad Branch: refs/heads/master Commit: 9ce0c7ad333f4a3c01207e5e9ed42bcafb99d894 Parents: 7478c8b Author: Burak Yavuz Authored: Mon Aug 24 13:48:01 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 24 13:48:01 2015 -0700 -- .../scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../spark/sql/DataFrameStatFunctions.scala | 101 +++ 2 files changed, 102 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9ce0c7ad/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d6688b2..791c10c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -684,7 +684,7 @@ class DataFrame private[sql]( // make it a NamedExpression. case Column(u: UnresolvedAttribute) => UnresolvedAlias(u) case Column(expr: NamedExpression) => expr - // Leave an unaliased explode with an empty list of names since the analzyer will generate the + // Leave an unaliased explode with an empty list of names since the analyzer will generate the // correct defaults after the nested expression's type has been resolved. case Column(explode: Explode) => MultiAlias(explode, Nil) case Column(expr: Expression) => Alias(expr, expr.prettyString)() http://git-wip-us.apache.org/repos/asf/spark/blob/9ce0c7ad/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 2e68e35..69c9847 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -39,6 +39,13 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param col2 the name of the second column * @return the covariance of the two columns. * + * {{{ + *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", rand(seed=10)) + * .withColumn("rand2", rand(seed=27)) + *df.stat.cov("rand1", "rand2") + *res1: Double = 0.065... + * }}} + * * @since 1.4.0 */ def cov(col1: String, col2: String): Double = { @@ -54,6 +61,13 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param col2 the name of the column to calculate the correlation against * @return The Pearson Correlation Coefficient as a Double. * + * {{{ + *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", rand(seed=10)) + * .withColumn("rand2", rand(seed=27)) + *df.stat.corr("rand1", "rand2") + *res1: Double = 0.613... + * }}} + * * @since 1.4.0 */ def corr(col1: String, col2: String, method: String): Double = { @@ -69,6 +83,13 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param col2 the name of the column to calculate the correlation against * @return The Pearson Correlation Coefficient as a Double. * + * {{{ + *val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", rand(seed=10)) + * .withColumn("rand2", rand(seed=27)) + *df.stat.corr("rand1", "rand2", "pearson") + *res1: Double = 0.613... + * }}} + * * @since 1.4.0 */ def corr(col1: String, col2: String): Double = { @@ -92,6 +113,20 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * of the DataFrame. * @return A DataFrame containing for the contingency table. * + * {{{ + *val df = sqlContext.createDataFrame(Seq((1, 1), (1, 2), (2, 1), (2, 1), (2, 3), (3, 2), + * (3, 3))).toDF("key", "value") + *val ct = df.stat.crosstab("key", "value") + *ct.show() + *+-+---+---+---+ + *|key_value| 1| 2| 3| + *+-+---+---+---+ + *|2| 2| 0| 1| + *|
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.5.0-rc2 [deleted] e2569282a - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10144] [UI] Actually show peak execution memory by default
Repository: spark Updated Branches: refs/heads/branch-1.5 43dcf95e4 -> 831f78ee5 [SPARK-10144] [UI] Actually show peak execution memory by default The peak execution memory metric was introduced in SPARK-8735. That was before Tungsten was enabled by default, so it assumed that `spark.sql.unsafe.enabled` must be explicitly set to true. The result is that the memory is not displayed by default. Author: Andrew Or Closes #8345 from andrewor14/show-memory-default. (cherry picked from commit 662bb9667669cb07cf6d2ccee0d8e76bb561cd89) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/831f78ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/831f78ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/831f78ee Branch: refs/heads/branch-1.5 Commit: 831f78ee5d2deed9b529214b2613c7e972453514 Parents: 43dcf95 Author: Andrew Or Authored: Mon Aug 24 14:10:50 2015 -0700 Committer: Yin Huai Committed: Mon Aug 24 14:11:03 2015 -0700 -- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 6 ++ core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala | 8 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/831f78ee/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index fb4556b..4adc659 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -68,8 +68,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { // if we find that it's okay. private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000) - private val displayPeakExecutionMemory = -parent.conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean) + private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true) def render(request: HttpServletRequest): Seq[Node] = { progressListener.synchronized { @@ -1193,8 +1192,7 @@ private[ui] class TaskPagedTable( desc: Boolean) extends PagedTable[TaskTableRowData] { // We only track peak memory used for unsafe operators - private val displayPeakExecutionMemory = -conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean) + private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true) override def tableId: String = "task-table" http://git-wip-us.apache.org/repos/asf/spark/blob/831f78ee/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 98f9314..3388c6d 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -33,14 +33,18 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { test("peak execution memory only displayed if unsafe is enabled") { val unsafeConf = "spark.sql.unsafe.enabled" -val conf = new SparkConf().set(unsafeConf, "true") +val conf = new SparkConf(false).set(unsafeConf, "true") val html = renderStagePage(conf).toString().toLowerCase val targetString = "peak execution memory" assert(html.contains(targetString)) // Disable unsafe and make sure it's not there -val conf2 = new SparkConf().set(unsafeConf, "false") +val conf2 = new SparkConf(false).set(unsafeConf, "false") val html2 = renderStagePage(conf2).toString().toLowerCase assert(!html2.contains(targetString)) +// Avoid setting anything; it should be displayed by default +val conf3 = new SparkConf(false) +val html3 = renderStagePage(conf3).toString().toLowerCase +assert(html3.contains(targetString)) } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10144] [UI] Actually show peak execution memory by default
Repository: spark Updated Branches: refs/heads/master 9ce0c7ad3 -> 662bb9667 [SPARK-10144] [UI] Actually show peak execution memory by default The peak execution memory metric was introduced in SPARK-8735. That was before Tungsten was enabled by default, so it assumed that `spark.sql.unsafe.enabled` must be explicitly set to true. The result is that the memory is not displayed by default. Author: Andrew Or Closes #8345 from andrewor14/show-memory-default. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/662bb966 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/662bb966 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/662bb966 Branch: refs/heads/master Commit: 662bb9667669cb07cf6d2ccee0d8e76bb561cd89 Parents: 9ce0c7a Author: Andrew Or Authored: Mon Aug 24 14:10:50 2015 -0700 Committer: Yin Huai Committed: Mon Aug 24 14:10:50 2015 -0700 -- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 6 ++ core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala | 8 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/662bb966/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index fb4556b..4adc659 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -68,8 +68,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { // if we find that it's okay. private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000) - private val displayPeakExecutionMemory = -parent.conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean) + private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true) def render(request: HttpServletRequest): Seq[Node] = { progressListener.synchronized { @@ -1193,8 +1192,7 @@ private[ui] class TaskPagedTable( desc: Boolean) extends PagedTable[TaskTableRowData] { // We only track peak memory used for unsafe operators - private val displayPeakExecutionMemory = -conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean) + private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true) override def tableId: String = "task-table" http://git-wip-us.apache.org/repos/asf/spark/blob/662bb966/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 98f9314..3388c6d 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -33,14 +33,18 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { test("peak execution memory only displayed if unsafe is enabled") { val unsafeConf = "spark.sql.unsafe.enabled" -val conf = new SparkConf().set(unsafeConf, "true") +val conf = new SparkConf(false).set(unsafeConf, "true") val html = renderStagePage(conf).toString().toLowerCase val targetString = "peak execution memory" assert(html.contains(targetString)) // Disable unsafe and make sure it's not there -val conf2 = new SparkConf().set(unsafeConf, "false") +val conf2 = new SparkConf(false).set(unsafeConf, "false") val html2 = renderStagePage(conf2).toString().toLowerCase assert(!html2.contains(targetString)) +// Avoid setting anything; it should be displayed by default +val conf3 = new SparkConf(false) +val html3 = renderStagePage(conf3).toString().toLowerCase +assert(html3.contains(targetString)) } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test cases
Repository: spark Updated Branches: refs/heads/master 662bb9667 -> a2f4cdceb [SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test cases This PR refactors `ParquetHiveCompatibilitySuite` so that it's easier to add new test cases. Hit two bugs, SPARK-10177 and HIVE-11625, while working on this, added test cases for them and marked as ignored for now. SPARK-10177 will be addressed in a separate PR. Author: Cheng Lian Closes #8392 from liancheng/spark-8580/parquet-hive-compat-tests. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2f4cdce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2f4cdce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2f4cdce Branch: refs/heads/master Commit: a2f4cdceba32aaa0df59df335ca0ce1ac73fc6c2 Parents: 662bb96 Author: Cheng Lian Authored: Mon Aug 24 14:11:19 2015 -0700 Committer: Davies Liu Committed: Mon Aug 24 14:11:19 2015 -0700 -- .../hive/ParquetHiveCompatibilitySuite.scala| 132 +-- 1 file changed, 93 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a2f4cdce/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index 13452e7..bc30180 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -17,15 +17,17 @@ package org.apache.spark.sql.hive +import java.sql.Timestamp +import java.util.{Locale, TimeZone} + import org.apache.hadoop.hive.conf.HiveConf +import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.{Row, SQLConf, SQLContext} -class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest { - import ParquetCompatibilityTest.makeNullable - +class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with BeforeAndAfterAll { override def _sqlContext: SQLContext = TestHive private val sqlContext = _sqlContext @@ -35,69 +37,121 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest { */ private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) - test("Read Parquet file generated by parquet-hive") { + private val originalTimeZone = TimeZone.getDefault + private val originalLocale = Locale.getDefault + + protected override def beforeAll(): Unit = { +TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) +Locale.setDefault(Locale.US) + } + + override protected def afterAll(): Unit = { +TimeZone.setDefault(originalTimeZone) +Locale.setDefault(originalLocale) + } + + override protected def logParquetSchema(path: String): Unit = { +val schema = readParquetSchema(path, { path => + !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir) +}) + +logInfo( + s"""Schema of the Parquet file written by parquet-avro: + |$schema + """.stripMargin) + } + + private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit = { withTable("parquet_compat") { withTempPath { dir => val path = dir.getCanonicalPath +// Hive columns are always nullable, so here we append a all-null row. +val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil + +// Don't convert Hive metastore Parquet tables to let Hive write those Parquet files. withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") { withTempTable("data") { -sqlContext.sql( +val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s" col_$index $typ" } + +val ddl = s"""CREATE TABLE parquet_compat( - | bool_column BOOLEAN, - | byte_column TINYINT, - | short_column SMALLINT, - | int_column INT, - | long_column BIGINT, - | float_column FLOAT, - | double_column DOUBLE, - | - | strings_column ARRAY, - | int_to_string_column MAP + |${fields.mkString(",\n")} |) |STORED AS PARQUET |LOCATION '$path' + """.stripMargin + +logInfo( + s"""Cr
spark git commit: [SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test cases
Repository: spark Updated Branches: refs/heads/branch-1.5 831f78ee5 -> d36f3517c [SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test cases This PR refactors `ParquetHiveCompatibilitySuite` so that it's easier to add new test cases. Hit two bugs, SPARK-10177 and HIVE-11625, while working on this, added test cases for them and marked as ignored for now. SPARK-10177 will be addressed in a separate PR. Author: Cheng Lian Closes #8392 from liancheng/spark-8580/parquet-hive-compat-tests. (cherry picked from commit a2f4cdceba32aaa0df59df335ca0ce1ac73fc6c2) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d36f3517 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d36f3517 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d36f3517 Branch: refs/heads/branch-1.5 Commit: d36f3517c8ddd8f9b5f05d0634dc2d49448200d9 Parents: 831f78e Author: Cheng Lian Authored: Mon Aug 24 14:11:19 2015 -0700 Committer: Davies Liu Committed: Mon Aug 24 14:11:30 2015 -0700 -- .../hive/ParquetHiveCompatibilitySuite.scala| 132 +-- 1 file changed, 93 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d36f3517/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index 13452e7..bc30180 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -17,15 +17,17 @@ package org.apache.spark.sql.hive +import java.sql.Timestamp +import java.util.{Locale, TimeZone} + import org.apache.hadoop.hive.conf.HiveConf +import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.{Row, SQLConf, SQLContext} -class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest { - import ParquetCompatibilityTest.makeNullable - +class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with BeforeAndAfterAll { override def _sqlContext: SQLContext = TestHive private val sqlContext = _sqlContext @@ -35,69 +37,121 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest { */ private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) - test("Read Parquet file generated by parquet-hive") { + private val originalTimeZone = TimeZone.getDefault + private val originalLocale = Locale.getDefault + + protected override def beforeAll(): Unit = { +TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) +Locale.setDefault(Locale.US) + } + + override protected def afterAll(): Unit = { +TimeZone.setDefault(originalTimeZone) +Locale.setDefault(originalLocale) + } + + override protected def logParquetSchema(path: String): Unit = { +val schema = readParquetSchema(path, { path => + !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir) +}) + +logInfo( + s"""Schema of the Parquet file written by parquet-avro: + |$schema + """.stripMargin) + } + + private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit = { withTable("parquet_compat") { withTempPath { dir => val path = dir.getCanonicalPath +// Hive columns are always nullable, so here we append a all-null row. +val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil + +// Don't convert Hive metastore Parquet tables to let Hive write those Parquet files. withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") { withTempTable("data") { -sqlContext.sql( +val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s" col_$index $typ" } + +val ddl = s"""CREATE TABLE parquet_compat( - | bool_column BOOLEAN, - | byte_column TINYINT, - | short_column SMALLINT, - | int_column INT, - | long_column BIGINT, - | float_column FLOAT, - | double_column DOUBLE, - | - | strings_column ARRAY, - | int_to_string_column MAP + |${fields.mkString(",\n")} |) |STORED AS PARQUET
spark git commit: [SPARK-9758] [TEST] [SQL] Compilation issue for hive test / wrong package?
Repository: spark Updated Branches: refs/heads/branch-1.5 d36f3517c -> 92234439d [SPARK-9758] [TEST] [SQL] Compilation issue for hive test / wrong package? Move `test.org.apache.spark.sql.hive` package tests to apparent intended `org.apache.spark.sql.hive` as they don't intend to test behavior from outside org.apache.spark.* Alternate take, per discussion at https://github.com/apache/spark/pull/8051 I think this is what vanzin and I had in mind but also CC rxin to cross-check, as this does indeed depend on whether these tests were accidentally in this package or not. Testing from a `test.org.apache.spark` package is legitimate but didn't seem to be the intent here. Author: Sean Owen Closes #8307 from srowen/SPARK-9758. (cherry picked from commit cb2d2e15844d7ae34b5dd7028b55e11586ed93fa) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92234439 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92234439 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92234439 Branch: refs/heads/branch-1.5 Commit: 92234439d86044a3ec9f198c3b13ec20c763393d Parents: d36f351 Author: Sean Owen Authored: Mon Aug 24 22:35:21 2015 +0100 Committer: Sean Owen Committed: Mon Aug 24 22:35:31 2015 +0100 -- .../spark/sql/hive/JavaDataFrameSuite.java | 104 .../sql/hive/JavaMetastoreDataSourcesSuite.java | 162 ++ .../spark/sql/hive/aggregate/MyDoubleAvg.java | 129 +++ .../spark/sql/hive/aggregate/MyDoubleSum.java | 118 ++ .../sql/hive/execution/UDFIntegerToString.java | 26 +++ .../sql/hive/execution/UDFListListInt.java | 47 ++ .../spark/sql/hive/execution/UDFListString.java | 38 + .../sql/hive/execution/UDFStringString.java | 26 +++ .../sql/hive/execution/UDFTwoListList.java | 28 .../spark/sql/hive/JavaDataFrameSuite.java | 106 .../sql/hive/JavaMetastoreDataSourcesSuite.java | 163 --- .../spark/sql/hive/aggregate/MyDoubleAvg.java | 129 --- .../spark/sql/hive/aggregate/MyDoubleSum.java | 118 -- .../sql/hive/execution/UDFIntegerToString.java | 26 --- .../sql/hive/execution/UDFListListInt.java | 47 -- .../spark/sql/hive/execution/UDFListString.java | 38 - .../sql/hive/execution/UDFStringString.java | 26 --- .../sql/hive/execution/UDFTwoListList.java | 28 .../hive/execution/AggregationQuerySuite.scala | 2 +- 19 files changed, 679 insertions(+), 682 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/92234439/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java -- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java new file mode 100644 index 000..019d8a3 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Window; +import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; +import static org.apache.spark.sql.functions.*; +import org.apache.spark.sql.hive.test.TestHive$; +import org.apache.spark.sql.hive.aggregate.MyDoubleSum; + +public class JavaDataFrameSuite { + private transient JavaSparkContext sc; + private transient HiveContext hc; + + DataFrame df; + + private void checkAnswer(DataFrame actual, List expected) { +String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected); +if (errorMessage != null) { + Assert.fai
spark git commit: [SPARK-9758] [TEST] [SQL] Compilation issue for hive test / wrong package?
Repository: spark Updated Branches: refs/heads/master a2f4cdceb -> cb2d2e158 [SPARK-9758] [TEST] [SQL] Compilation issue for hive test / wrong package? Move `test.org.apache.spark.sql.hive` package tests to apparent intended `org.apache.spark.sql.hive` as they don't intend to test behavior from outside org.apache.spark.* Alternate take, per discussion at https://github.com/apache/spark/pull/8051 I think this is what vanzin and I had in mind but also CC rxin to cross-check, as this does indeed depend on whether these tests were accidentally in this package or not. Testing from a `test.org.apache.spark` package is legitimate but didn't seem to be the intent here. Author: Sean Owen Closes #8307 from srowen/SPARK-9758. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb2d2e15 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb2d2e15 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb2d2e15 Branch: refs/heads/master Commit: cb2d2e15844d7ae34b5dd7028b55e11586ed93fa Parents: a2f4cdc Author: Sean Owen Authored: Mon Aug 24 22:35:21 2015 +0100 Committer: Sean Owen Committed: Mon Aug 24 22:35:21 2015 +0100 -- .../spark/sql/hive/JavaDataFrameSuite.java | 104 .../sql/hive/JavaMetastoreDataSourcesSuite.java | 162 ++ .../spark/sql/hive/aggregate/MyDoubleAvg.java | 129 +++ .../spark/sql/hive/aggregate/MyDoubleSum.java | 118 ++ .../sql/hive/execution/UDFIntegerToString.java | 26 +++ .../sql/hive/execution/UDFListListInt.java | 47 ++ .../spark/sql/hive/execution/UDFListString.java | 38 + .../sql/hive/execution/UDFStringString.java | 26 +++ .../sql/hive/execution/UDFTwoListList.java | 28 .../spark/sql/hive/JavaDataFrameSuite.java | 106 .../sql/hive/JavaMetastoreDataSourcesSuite.java | 163 --- .../spark/sql/hive/aggregate/MyDoubleAvg.java | 129 --- .../spark/sql/hive/aggregate/MyDoubleSum.java | 118 -- .../sql/hive/execution/UDFIntegerToString.java | 26 --- .../sql/hive/execution/UDFListListInt.java | 47 -- .../spark/sql/hive/execution/UDFListString.java | 38 - .../sql/hive/execution/UDFStringString.java | 26 --- .../sql/hive/execution/UDFTwoListList.java | 28 .../hive/execution/AggregationQuerySuite.scala | 2 +- 19 files changed, 679 insertions(+), 682 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java -- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java new file mode 100644 index 000..019d8a3 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Window; +import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; +import static org.apache.spark.sql.functions.*; +import org.apache.spark.sql.hive.test.TestHive$; +import org.apache.spark.sql.hive.aggregate.MyDoubleSum; + +public class JavaDataFrameSuite { + private transient JavaSparkContext sc; + private transient HiveContext hc; + + DataFrame df; + + private void checkAnswer(DataFrame actual, List expected) { +String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected); +if (errorMessage != null) { + Assert.fail(errorMessage); +} + } + + @Before + public void setUp() throws IOException { +hc = TestHive
spark git commit: [SPARK-10061] [DOC] ML ensemble docs
Repository: spark Updated Branches: refs/heads/branch-1.5 92234439d -> aadb9de4c [SPARK-10061] [DOC] ML ensemble docs User guide for spark.ml GBTs and Random Forests. The examples are copied from the decision tree guide and modified to run. I caught some issues I had somehow missed in the tree guide as well. I have run all examples, including Java ones. (Of course, I thought I had previously as well...) CC: mengxr manishamde yanboliang Author: Joseph K. Bradley Closes #8369 from jkbradley/ml-ensemble-docs. (cherry picked from commit 13db11cb08eb90eb0ea3402c9fe0270aa282f247) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aadb9de4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aadb9de4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aadb9de4 Branch: refs/heads/branch-1.5 Commit: aadb9de4ce81db420cac0400b6de5bcc82c4ebe4 Parents: 9223443 Author: Joseph K. Bradley Authored: Mon Aug 24 15:38:54 2015 -0700 Committer: Xiangrui Meng Committed: Mon Aug 24 15:39:01 2015 -0700 -- docs/ml-decision-tree.md | 75 ++-- docs/ml-ensembles.md | 952 +- 2 files changed, 976 insertions(+), 51 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aadb9de4/docs/ml-decision-tree.md -- diff --git a/docs/ml-decision-tree.md b/docs/ml-decision-tree.md index 958c6f5..542819e 100644 --- a/docs/ml-decision-tree.md +++ b/docs/ml-decision-tree.md @@ -30,7 +30,7 @@ The Pipelines API for Decision Trees offers a bit more functionality than the or Ensembles of trees (Random Forests and Gradient-Boosted Trees) are described in the [Ensembles guide](ml-ensembles.html). -# Inputs and Outputs (Predictions) +# Inputs and Outputs We list the input and output (prediction) column types here. All output columns are optional; to exclude an output column, set its corresponding Param to an empty string. @@ -234,7 +234,7 @@ IndexToString labelConverter = new IndexToString() // Chain indexers and tree in a Pipeline Pipeline pipeline = new Pipeline() - .setStages(new PipelineStage[]{labelIndexer, featureIndexer, dt, labelConverter}); + .setStages(new PipelineStage[] {labelIndexer, featureIndexer, dt, labelConverter}); // Train model. This also runs the indexers. PipelineModel model = pipeline.fit(trainingData); @@ -315,10 +315,13 @@ print treeModel # summary only ## Regression +The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. +We use a feature transformer to index categorical features, adding metadata to the `DataFrame` which the Decision Tree algorithm can recognize. + -More details on parameters can be found in the [Scala API documentation](api/scala/index.html#org.apache.spark.ml.classification.DecisionTreeClassifier). +More details on parameters can be found in the [Scala API documentation](api/scala/index.html#org.apache.spark.ml.regression.DecisionTreeRegressor). {% highlight scala %} import org.apache.spark.ml.Pipeline @@ -347,7 +350,7 @@ val dt = new DecisionTreeRegressor() .setLabelCol("label") .setFeaturesCol("indexedFeatures") -// Chain indexers and tree in a Pipeline +// Chain indexer and tree in a Pipeline val pipeline = new Pipeline() .setStages(Array(featureIndexer, dt)) @@ -365,9 +368,7 @@ val evaluator = new RegressionEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName("rmse") -// We negate the RMSE value since RegressionEvalutor returns negated RMSE -// (since evaluation metrics are meant to be maximized by CrossValidator). -val rmse = - evaluator.evaluate(predictions) +val rmse = evaluator.evaluate(predictions) println("Root Mean Squared Error (RMSE) on test data = " + rmse) val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel] @@ -377,14 +378,15 @@ println("Learned regression tree model:\n" + treeModel.toDebugString) -More details on parameters can be found in the [Java API documentation](api/java/org/apache/spark/ml/classification/DecisionTreeClassifier.html). +More details on parameters can be found in the [Java API documentation](api/java/org/apache/spark/ml/regression/DecisionTreeRegressor.html). {% highlight java %} import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.PipelineModel; import org.apache.spark.ml.PipelineStage; import org.apache.spark.ml.evaluation.RegressionEvaluator; -import org.apache.spark.ml.feature.*; +import org.apache.spark.ml.feature.VectorIndexer; +import org.apache.spark.ml.feature.VectorIndexerModel; import org.apache.spark.ml.regression.DecisionTreeR
spark git commit: [SPARK-10061] [DOC] ML ensemble docs
Repository: spark Updated Branches: refs/heads/master cb2d2e158 -> 13db11cb0 [SPARK-10061] [DOC] ML ensemble docs User guide for spark.ml GBTs and Random Forests. The examples are copied from the decision tree guide and modified to run. I caught some issues I had somehow missed in the tree guide as well. I have run all examples, including Java ones. (Of course, I thought I had previously as well...) CC: mengxr manishamde yanboliang Author: Joseph K. Bradley Closes #8369 from jkbradley/ml-ensemble-docs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13db11cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13db11cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13db11cb Branch: refs/heads/master Commit: 13db11cb08eb90eb0ea3402c9fe0270aa282f247 Parents: cb2d2e1 Author: Joseph K. Bradley Authored: Mon Aug 24 15:38:54 2015 -0700 Committer: Xiangrui Meng Committed: Mon Aug 24 15:38:54 2015 -0700 -- docs/ml-decision-tree.md | 75 ++-- docs/ml-ensembles.md | 952 +- 2 files changed, 976 insertions(+), 51 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/13db11cb/docs/ml-decision-tree.md -- diff --git a/docs/ml-decision-tree.md b/docs/ml-decision-tree.md index 958c6f5..542819e 100644 --- a/docs/ml-decision-tree.md +++ b/docs/ml-decision-tree.md @@ -30,7 +30,7 @@ The Pipelines API for Decision Trees offers a bit more functionality than the or Ensembles of trees (Random Forests and Gradient-Boosted Trees) are described in the [Ensembles guide](ml-ensembles.html). -# Inputs and Outputs (Predictions) +# Inputs and Outputs We list the input and output (prediction) column types here. All output columns are optional; to exclude an output column, set its corresponding Param to an empty string. @@ -234,7 +234,7 @@ IndexToString labelConverter = new IndexToString() // Chain indexers and tree in a Pipeline Pipeline pipeline = new Pipeline() - .setStages(new PipelineStage[]{labelIndexer, featureIndexer, dt, labelConverter}); + .setStages(new PipelineStage[] {labelIndexer, featureIndexer, dt, labelConverter}); // Train model. This also runs the indexers. PipelineModel model = pipeline.fit(trainingData); @@ -315,10 +315,13 @@ print treeModel # summary only ## Regression +The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. +We use a feature transformer to index categorical features, adding metadata to the `DataFrame` which the Decision Tree algorithm can recognize. + -More details on parameters can be found in the [Scala API documentation](api/scala/index.html#org.apache.spark.ml.classification.DecisionTreeClassifier). +More details on parameters can be found in the [Scala API documentation](api/scala/index.html#org.apache.spark.ml.regression.DecisionTreeRegressor). {% highlight scala %} import org.apache.spark.ml.Pipeline @@ -347,7 +350,7 @@ val dt = new DecisionTreeRegressor() .setLabelCol("label") .setFeaturesCol("indexedFeatures") -// Chain indexers and tree in a Pipeline +// Chain indexer and tree in a Pipeline val pipeline = new Pipeline() .setStages(Array(featureIndexer, dt)) @@ -365,9 +368,7 @@ val evaluator = new RegressionEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName("rmse") -// We negate the RMSE value since RegressionEvalutor returns negated RMSE -// (since evaluation metrics are meant to be maximized by CrossValidator). -val rmse = - evaluator.evaluate(predictions) +val rmse = evaluator.evaluate(predictions) println("Root Mean Squared Error (RMSE) on test data = " + rmse) val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel] @@ -377,14 +378,15 @@ println("Learned regression tree model:\n" + treeModel.toDebugString) -More details on parameters can be found in the [Java API documentation](api/java/org/apache/spark/ml/classification/DecisionTreeClassifier.html). +More details on parameters can be found in the [Java API documentation](api/java/org/apache/spark/ml/regression/DecisionTreeRegressor.html). {% highlight java %} import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.PipelineModel; import org.apache.spark.ml.PipelineStage; import org.apache.spark.ml.evaluation.RegressionEvaluator; -import org.apache.spark.ml.feature.*; +import org.apache.spark.ml.feature.VectorIndexer; +import org.apache.spark.ml.feature.VectorIndexerModel; import org.apache.spark.ml.regression.DecisionTreeRegressionModel; import org.apache.spark.ml.regression.DecisionTreeRegressor; import org.apache.spark.mllib
spark git commit: [SPARK-10190] Fix NPE in CatalystTypeConverters Decimal toScala converter
Repository: spark Updated Branches: refs/heads/branch-1.5 aadb9de4c -> a4bad5f25 [SPARK-10190] Fix NPE in CatalystTypeConverters Decimal toScala converter This adds a missing null check to the Decimal `toScala` converter in `CatalystTypeConverters`, fixing an NPE. Author: Josh Rosen Closes #8401 from JoshRosen/SPARK-10190. (cherry picked from commit d7b4c095271c36fcc7f9ded267ecf5ec66fac803) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4bad5f2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4bad5f2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4bad5f2 Branch: refs/heads/branch-1.5 Commit: a4bad5f25ed41821e36ecab23ec686fcb6071deb Parents: aadb9de Author: Josh Rosen Authored: Mon Aug 24 16:17:45 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 24 16:17:52 2015 -0700 -- .../org/apache/spark/sql/catalyst/CatalystTypeConverters.scala | 5 - .../apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a4bad5f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 8d0c64e..966623e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -329,7 +329,10 @@ object CatalystTypeConverters { null } } -override def toScala(catalystValue: Decimal): JavaBigDecimal = catalystValue.toJavaBigDecimal +override def toScala(catalystValue: Decimal): JavaBigDecimal = { + if (catalystValue == null) null + else catalystValue.toJavaBigDecimal +} override def toScalaImpl(row: InternalRow, column: Int): JavaBigDecimal = row.getDecimal(column, dataType.precision, dataType.scale).toJavaBigDecimal } http://git-wip-us.apache.org/repos/asf/spark/blob/a4bad5f2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index df0f045..03bb102 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -32,7 +32,9 @@ class CatalystTypeConvertersSuite extends SparkFunSuite { IntegerType, LongType, FloatType, -DoubleType) +DoubleType, +DecimalType.SYSTEM_DEFAULT, +DecimalType.USER_DEFAULT) test("null handling in rows") { val schema = StructType(simpleTypes.map(t => StructField(t.getClass.getName, t))) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10190] Fix NPE in CatalystTypeConverters Decimal toScala converter
Repository: spark Updated Branches: refs/heads/master 13db11cb0 -> d7b4c0952 [SPARK-10190] Fix NPE in CatalystTypeConverters Decimal toScala converter This adds a missing null check to the Decimal `toScala` converter in `CatalystTypeConverters`, fixing an NPE. Author: Josh Rosen Closes #8401 from JoshRosen/SPARK-10190. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7b4c095 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7b4c095 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7b4c095 Branch: refs/heads/master Commit: d7b4c095271c36fcc7f9ded267ecf5ec66fac803 Parents: 13db11c Author: Josh Rosen Authored: Mon Aug 24 16:17:45 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 24 16:17:45 2015 -0700 -- .../org/apache/spark/sql/catalyst/CatalystTypeConverters.scala | 5 - .../apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d7b4c095/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 8d0c64e..966623e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -329,7 +329,10 @@ object CatalystTypeConverters { null } } -override def toScala(catalystValue: Decimal): JavaBigDecimal = catalystValue.toJavaBigDecimal +override def toScala(catalystValue: Decimal): JavaBigDecimal = { + if (catalystValue == null) null + else catalystValue.toJavaBigDecimal +} override def toScalaImpl(row: InternalRow, column: Int): JavaBigDecimal = row.getDecimal(column, dataType.precision, dataType.scale).toJavaBigDecimal } http://git-wip-us.apache.org/repos/asf/spark/blob/d7b4c095/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index df0f045..03bb102 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -32,7 +32,9 @@ class CatalystTypeConvertersSuite extends SparkFunSuite { IntegerType, LongType, FloatType, -DoubleType) +DoubleType, +DecimalType.SYSTEM_DEFAULT, +DecimalType.USER_DEFAULT) test("null handling in rows") { val schema = StructType(simpleTypes.map(t => StructField(t.getClass.getName, t))) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: HOTFIX: Adding missing 1.4.1 ec2 version
Repository: spark Updated Branches: refs/heads/branch-1.5 a4bad5f25 -> 8ca8bdd01 HOTFIX: Adding missing 1.4.1 ec2 version Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ca8bdd0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ca8bdd0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ca8bdd0 Branch: refs/heads/branch-1.5 Commit: 8ca8bdd015c53ff0c4705886545fc30eef8b8359 Parents: a4bad5f Author: Patrick Wendell Authored: Mon Aug 24 17:22:09 2015 -0700 Committer: Patrick Wendell Committed: Mon Aug 24 17:22:09 2015 -0700 -- ec2/spark_ec2.py | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8ca8bdd0/ec2/spark_ec2.py -- diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index ccc897f..3a2361c 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -71,6 +71,7 @@ VALID_SPARK_VERSIONS = set([ "1.3.0", "1.3.1", "1.4.0", +"1.4.1", "1.5.0" ]) @@ -85,6 +86,7 @@ SPARK_TACHYON_MAP = { "1.3.0": "0.5.0", "1.3.1": "0.5.0", "1.4.0": "0.6.4", +"1.4.1": "0.6.4", "1.5.0": "0.7.1" } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10165] [SQL] Await child resolution in ResolveFunctions
Repository: spark Updated Branches: refs/heads/master d7b4c0952 -> 2bf338c62 [SPARK-10165] [SQL] Await child resolution in ResolveFunctions Currently, we eagerly attempt to resolve functions, even before their children are resolved. However, this is not valid in cases where we need to know the types of the input arguments (i.e. when resolving Hive UDFs). As a fix, this PR delays function resolution until the functions children are resolved. This change also necessitates a change to the way we resolve aggregate expressions that are not in aggregate operators (e.g., in `HAVING` or `ORDER BY` clauses). Specifically, we can't assume that these misplaced functions will be resolved, allowing us to differentiate aggregate functions from normal functions. To compensate for this change we now attempt to resolve these unresolved expressions in the context of the aggregate operator, before checking to see if any aggregate expressions are present. Author: Michael Armbrust Closes #8371 from marmbrus/hiveUDFResolution. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bf338c6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bf338c6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bf338c6 Branch: refs/heads/master Commit: 2bf338c626e9d97ccc033cfadae8b36a82c66fd1 Parents: d7b4c09 Author: Michael Armbrust Authored: Mon Aug 24 18:10:51 2015 -0700 Committer: Michael Armbrust Committed: Mon Aug 24 18:10:51 2015 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 116 --- .../spark/sql/hive/execution/HiveUDFSuite.scala | 5 + 2 files changed, 77 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2bf338c6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d0eb9c2..1a5de15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -78,7 +78,7 @@ class Analyzer( ResolveAliases :: ExtractWindowExpressions :: GlobalAggregates :: - UnresolvedHavingClauseAttributes :: + ResolveAggregateFunctions :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once, @@ -452,37 +452,6 @@ class Analyzer( logDebug(s"Failed to find $missing in ${p.output.mkString(", ")}") s // Nothing we can do here. Return original plan. } - case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child)) - if !s.resolved && a.resolved => -// A small hack to create an object that will allow us to resolve any references that -// refer to named expressions that are present in the grouping expressions. -val groupingRelation = LocalRelation( - grouping.collect { case ne: NamedExpression => ne.toAttribute } -) - -// Find sort attributes that are projected away so we can temporarily add them back in. -val (newOrdering, missingAttr) = resolveAndFindMissing(ordering, a, groupingRelation) - -// Find aggregate expressions and evaluate them early, since they can't be evaluated in a -// Sort. -val (withAggsRemoved, aliasedAggregateList) = newOrdering.map { - case aggOrdering if aggOrdering.collect { case a: AggregateExpression => a }.nonEmpty => -val aliased = Alias(aggOrdering.child, "_aggOrdering")() -(aggOrdering.copy(child = aliased.toAttribute), Some(aliased)) - - case other => (other, None) -}.unzip - -val missing = missingAttr ++ aliasedAggregateList.flatten - -if (missing.nonEmpty) { - // Add missing grouping exprs and then project them away after the sort. - Project(a.output, -Sort(withAggsRemoved, global, - Aggregate(grouping, aggs ++ missing, child))) -} else { - s // Nothing we can do here. Return original plan. -} } /** @@ -515,6 +484,7 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case q: LogicalPlan => q transformExpressions { + case u if !u.childrenResolved => u // Skip until children are resolved. case u @ UnresolvedFunction(name, children, isDistinct) => withPosition(u) { registry.lookupFunction(name, children) match { @@ -559,21 +529,79 @@ class Analyzer( }
spark git commit: [SPARK-10165] [SQL] Await child resolution in ResolveFunctions
Repository: spark Updated Branches: refs/heads/branch-1.5 8ca8bdd01 -> 228e429eb [SPARK-10165] [SQL] Await child resolution in ResolveFunctions Currently, we eagerly attempt to resolve functions, even before their children are resolved. However, this is not valid in cases where we need to know the types of the input arguments (i.e. when resolving Hive UDFs). As a fix, this PR delays function resolution until the functions children are resolved. This change also necessitates a change to the way we resolve aggregate expressions that are not in aggregate operators (e.g., in `HAVING` or `ORDER BY` clauses). Specifically, we can't assume that these misplaced functions will be resolved, allowing us to differentiate aggregate functions from normal functions. To compensate for this change we now attempt to resolve these unresolved expressions in the context of the aggregate operator, before checking to see if any aggregate expressions are present. Author: Michael Armbrust Closes #8371 from marmbrus/hiveUDFResolution. (cherry picked from commit 2bf338c626e9d97ccc033cfadae8b36a82c66fd1) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/228e429e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/228e429e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/228e429e Branch: refs/heads/branch-1.5 Commit: 228e429ebf1f367de9087f74cf3ff43bbd32f382 Parents: 8ca8bdd Author: Michael Armbrust Authored: Mon Aug 24 18:10:51 2015 -0700 Committer: Michael Armbrust Committed: Mon Aug 24 18:11:04 2015 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 116 --- .../spark/sql/hive/execution/HiveUDFSuite.scala | 5 + 2 files changed, 77 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/228e429e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d0eb9c2..1a5de15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -78,7 +78,7 @@ class Analyzer( ResolveAliases :: ExtractWindowExpressions :: GlobalAggregates :: - UnresolvedHavingClauseAttributes :: + ResolveAggregateFunctions :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once, @@ -452,37 +452,6 @@ class Analyzer( logDebug(s"Failed to find $missing in ${p.output.mkString(", ")}") s // Nothing we can do here. Return original plan. } - case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child)) - if !s.resolved && a.resolved => -// A small hack to create an object that will allow us to resolve any references that -// refer to named expressions that are present in the grouping expressions. -val groupingRelation = LocalRelation( - grouping.collect { case ne: NamedExpression => ne.toAttribute } -) - -// Find sort attributes that are projected away so we can temporarily add them back in. -val (newOrdering, missingAttr) = resolveAndFindMissing(ordering, a, groupingRelation) - -// Find aggregate expressions and evaluate them early, since they can't be evaluated in a -// Sort. -val (withAggsRemoved, aliasedAggregateList) = newOrdering.map { - case aggOrdering if aggOrdering.collect { case a: AggregateExpression => a }.nonEmpty => -val aliased = Alias(aggOrdering.child, "_aggOrdering")() -(aggOrdering.copy(child = aliased.toAttribute), Some(aliased)) - - case other => (other, None) -}.unzip - -val missing = missingAttr ++ aliasedAggregateList.flatten - -if (missing.nonEmpty) { - // Add missing grouping exprs and then project them away after the sort. - Project(a.output, -Sort(withAggsRemoved, global, - Aggregate(grouping, aggs ++ missing, child))) -} else { - s // Nothing we can do here. Return original plan. -} } /** @@ -515,6 +484,7 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case q: LogicalPlan => q transformExpressions { + case u if !u.childrenResolved => u // Skip until children are resolved. case u @ UnresolvedFunction(name, children, isDistinct) => withPosition(u)
spark git commit: [SPARK-10118] [SPARKR] [DOCS] Improve SparkR API docs for 1.5 release
Repository: spark Updated Branches: refs/heads/master 2bf338c62 -> 6511bf559 [SPARK-10118] [SPARKR] [DOCS] Improve SparkR API docs for 1.5 release cc: shivaram ## Summary - Modify `tdname` of expression functions. i.e. `ascii`: `rdname functions` => `rdname ascii` - Replace the dynamical function definitions to the static ones because of thir documentations. ## Generated PDF File https://drive.google.com/file/d/0B9biIZIU47lLX2t6ZjRoRnBTSEU/view?usp=sharing ## JIRA [[SPARK-10118] Improve SparkR API docs for 1.5 release - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-10118) Author: Yu ISHIKAWA Author: Yuu ISHIKAWA Closes #8386 from yu-iskw/SPARK-10118. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6511bf55 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6511bf55 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6511bf55 Branch: refs/heads/master Commit: 6511bf559b736d8e23ae398901c8d78938e66869 Parents: 2bf338c Author: Yu ISHIKAWA Authored: Mon Aug 24 18:17:51 2015 -0700 Committer: Shivaram Venkataraman Committed: Mon Aug 24 18:17:51 2015 -0700 -- R/create-docs.sh|2 +- R/pkg/R/column.R|5 +- R/pkg/R/functions.R | 1603 ++ R/pkg/R/generics.R | 214 +++ 4 files changed, 1596 insertions(+), 228 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6511bf55/R/create-docs.sh -- diff --git a/R/create-docs.sh b/R/create-docs.sh index 6a4687b..d2ae160 100755 --- a/R/create-docs.sh +++ b/R/create-docs.sh @@ -39,7 +39,7 @@ pushd $FWDIR mkdir -p pkg/html pushd pkg/html -Rscript -e 'library(SparkR, lib.loc="../../lib"); library(knitr); knit_rd("SparkR")' +Rscript -e 'libDir <- "../../lib"; library(SparkR, lib.loc=libDir); library(knitr); knit_rd("SparkR", links = tools::findHTMLlinks(paste(libDir, "SparkR", sep="/")))' popd http://git-wip-us.apache.org/repos/asf/spark/blob/6511bf55/R/pkg/R/column.R -- diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R index 5a07ebd..a1f50c3 100644 --- a/R/pkg/R/column.R +++ b/R/pkg/R/column.R @@ -169,8 +169,7 @@ setMethod("between", signature(x = "Column"), #' #' @rdname column #' -#' @examples -#' \dontrun{ +#' @examples \dontrun{ #' cast(df$age, "string") #' cast(df$name, list(type="array", elementType="byte", containsNull = TRUE)) #' } @@ -192,7 +191,7 @@ setMethod("cast", #' #' @rdname column #' @return a matched values as a result of comparing with given values. -#' \dontrun{ +#' @examples \dontrun{ #' filter(df, "age in (10, 30)") #' where(df, df$age %in% c(10, 30)) #' } http://git-wip-us.apache.org/repos/asf/spark/blob/6511bf55/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index b5879bd..d848730 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -18,69 +18,1298 @@ #' @include generics.R column.R NULL -#' @title S4 expression functions for DataFrame column(s) -#' @description These are expression functions on DataFrame columns - -functions1 <- c( - "abs", "acos", "approxCountDistinct", "ascii", "asin", "atan", - "avg", "base64", "bin", "bitwiseNOT", "cbrt", "ceil", "cos", "cosh", "count", - "crc32", "dayofmonth", "dayofyear", "exp", "explode", "expm1", "factorial", - "first", "floor", "hex", "hour", "initcap", "isNaN", "last", "last_day", - "length", "log", "log10", "log1p", "log2", "lower", "ltrim", "max", "md5", - "mean", "min", "minute", "month", "negate", "quarter", "reverse", - "rint", "round", "rtrim", "second", "sha1", "signum", "sin", "sinh", "size", - "soundex", "sqrt", "sum", "sumDistinct", "tan", "tanh", "toDegrees", - "toRadians", "to_date", "trim", "unbase64", "unhex", "upper", "weekofyear", - "year") -functions2 <- c( - "atan2", "datediff", "hypot", "levenshtein", "months_between", "nanvl", "pmod") - -createFunction1 <- function(name) { - setMethod(name, -signature(x = "Column"), -function(x) { - jc <- callJStatic("org.apache.spark.sql.functions", name, x@jc) - column(jc) -}) -} - -createFunction2 <- function(name) { - setMethod(name, -signature(y = "Column"), -function(y, x) { - if (class(x) == "Column") { -x <- x@jc - } - jc <- callJStatic("org.apache.spark.sql.functions", name, y@jc, x) - column(jc) -}) -} +#' Creates a \code{Column} of literal value. +#' +#' The passed in object is returned directly if it is already a \linkS4class{Column}. +#' If the object is a Scala Symbol, it
spark git commit: [SPARK-10118] [SPARKR] [DOCS] Improve SparkR API docs for 1.5 release
Repository: spark Updated Branches: refs/heads/branch-1.5 228e429eb -> ec5d09c0f [SPARK-10118] [SPARKR] [DOCS] Improve SparkR API docs for 1.5 release cc: shivaram ## Summary - Modify `tdname` of expression functions. i.e. `ascii`: `rdname functions` => `rdname ascii` - Replace the dynamical function definitions to the static ones because of thir documentations. ## Generated PDF File https://drive.google.com/file/d/0B9biIZIU47lLX2t6ZjRoRnBTSEU/view?usp=sharing ## JIRA [[SPARK-10118] Improve SparkR API docs for 1.5 release - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-10118) Author: Yu ISHIKAWA Author: Yuu ISHIKAWA Closes #8386 from yu-iskw/SPARK-10118. (cherry picked from commit 6511bf559b736d8e23ae398901c8d78938e66869) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec5d09c0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec5d09c0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec5d09c0 Branch: refs/heads/branch-1.5 Commit: ec5d09c0f0f1f61d6d80a35adaba3a8102184740 Parents: 228e429 Author: Yu ISHIKAWA Authored: Mon Aug 24 18:17:51 2015 -0700 Committer: Shivaram Venkataraman Committed: Mon Aug 24 18:17:58 2015 -0700 -- R/create-docs.sh|2 +- R/pkg/R/column.R|5 +- R/pkg/R/functions.R | 1603 ++ R/pkg/R/generics.R | 214 +++ 4 files changed, 1596 insertions(+), 228 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ec5d09c0/R/create-docs.sh -- diff --git a/R/create-docs.sh b/R/create-docs.sh index 6a4687b..d2ae160 100755 --- a/R/create-docs.sh +++ b/R/create-docs.sh @@ -39,7 +39,7 @@ pushd $FWDIR mkdir -p pkg/html pushd pkg/html -Rscript -e 'library(SparkR, lib.loc="../../lib"); library(knitr); knit_rd("SparkR")' +Rscript -e 'libDir <- "../../lib"; library(SparkR, lib.loc=libDir); library(knitr); knit_rd("SparkR", links = tools::findHTMLlinks(paste(libDir, "SparkR", sep="/")))' popd http://git-wip-us.apache.org/repos/asf/spark/blob/ec5d09c0/R/pkg/R/column.R -- diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R index 5a07ebd..a1f50c3 100644 --- a/R/pkg/R/column.R +++ b/R/pkg/R/column.R @@ -169,8 +169,7 @@ setMethod("between", signature(x = "Column"), #' #' @rdname column #' -#' @examples -#' \dontrun{ +#' @examples \dontrun{ #' cast(df$age, "string") #' cast(df$name, list(type="array", elementType="byte", containsNull = TRUE)) #' } @@ -192,7 +191,7 @@ setMethod("cast", #' #' @rdname column #' @return a matched values as a result of comparing with given values. -#' \dontrun{ +#' @examples \dontrun{ #' filter(df, "age in (10, 30)") #' where(df, df$age %in% c(10, 30)) #' } http://git-wip-us.apache.org/repos/asf/spark/blob/ec5d09c0/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index b5879bd..d848730 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -18,69 +18,1298 @@ #' @include generics.R column.R NULL -#' @title S4 expression functions for DataFrame column(s) -#' @description These are expression functions on DataFrame columns - -functions1 <- c( - "abs", "acos", "approxCountDistinct", "ascii", "asin", "atan", - "avg", "base64", "bin", "bitwiseNOT", "cbrt", "ceil", "cos", "cosh", "count", - "crc32", "dayofmonth", "dayofyear", "exp", "explode", "expm1", "factorial", - "first", "floor", "hex", "hour", "initcap", "isNaN", "last", "last_day", - "length", "log", "log10", "log1p", "log2", "lower", "ltrim", "max", "md5", - "mean", "min", "minute", "month", "negate", "quarter", "reverse", - "rint", "round", "rtrim", "second", "sha1", "signum", "sin", "sinh", "size", - "soundex", "sqrt", "sum", "sumDistinct", "tan", "tanh", "toDegrees", - "toRadians", "to_date", "trim", "unbase64", "unhex", "upper", "weekofyear", - "year") -functions2 <- c( - "atan2", "datediff", "hypot", "levenshtein", "months_between", "nanvl", "pmod") - -createFunction1 <- function(name) { - setMethod(name, -signature(x = "Column"), -function(x) { - jc <- callJStatic("org.apache.spark.sql.functions", name, x@jc) - column(jc) -}) -} - -createFunction2 <- function(name) { - setMethod(name, -signature(y = "Column"), -function(y, x) { - if (class(x) == "Column") { -x <- x@jc - } - jc <- callJStatic("org.apache.spark.sql.functions", name, y@jc, x) - column(jc) -}) -} +#' Creates a \code{Column} of literal value. +#' +#' The pass
spark git commit: [SQL] [MINOR] [DOC] Clarify docs for inferring DataFrame from RDD of Products
Repository: spark Updated Branches: refs/heads/branch-1.5 ec5d09c0f -> 2f7e4b416 [SQL] [MINOR] [DOC] Clarify docs for inferring DataFrame from RDD of Products * Makes `SQLImplicits.rddToDataFrameHolder` scaladoc consistent with `SQLContext.createDataFrame[A <: Product](rdd: RDD[A])` since the former is essentially a wrapper for the latter * Clarifies `createDataFrame[A <: Product]` scaladoc to apply for any `RDD[Product]`, not just case classes Author: Feynman Liang Closes #8406 from feynmanliang/sql-doc-fixes. (cherry picked from commit 642c43c81c835139e3f35dfd6a215d668a474203) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f7e4b41 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f7e4b41 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f7e4b41 Branch: refs/heads/branch-1.5 Commit: 2f7e4b416492ff3c2ea7fcab05b57ed9f0c6e45b Parents: ec5d09c Author: Feynman Liang Authored: Mon Aug 24 19:45:41 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 24 19:45:48 2015 -0700 -- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2f7e4b41/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 126c9c6..a1eea09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -350,7 +350,7 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * :: Experimental :: - * Creates a DataFrame from an RDD of case classes. + * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples). * * @group dataframes * @since 1.3.0 http://git-wip-us.apache.org/repos/asf/spark/blob/2f7e4b41/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 47b6f80..bf03c61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -40,7 +40,7 @@ private[sql] abstract class SQLImplicits { implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name) /** - * Creates a DataFrame from an RDD of case classes or tuples. + * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples). * @since 1.3.0 */ implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL] [MINOR] [DOC] Clarify docs for inferring DataFrame from RDD of Products
Repository: spark Updated Branches: refs/heads/master 6511bf559 -> 642c43c81 [SQL] [MINOR] [DOC] Clarify docs for inferring DataFrame from RDD of Products * Makes `SQLImplicits.rddToDataFrameHolder` scaladoc consistent with `SQLContext.createDataFrame[A <: Product](rdd: RDD[A])` since the former is essentially a wrapper for the latter * Clarifies `createDataFrame[A <: Product]` scaladoc to apply for any `RDD[Product]`, not just case classes Author: Feynman Liang Closes #8406 from feynmanliang/sql-doc-fixes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/642c43c8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/642c43c8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/642c43c8 Branch: refs/heads/master Commit: 642c43c81c835139e3f35dfd6a215d668a474203 Parents: 6511bf5 Author: Feynman Liang Authored: Mon Aug 24 19:45:41 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 24 19:45:41 2015 -0700 -- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/642c43c8/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 126c9c6..a1eea09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -350,7 +350,7 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * :: Experimental :: - * Creates a DataFrame from an RDD of case classes. + * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples). * * @group dataframes * @since 1.3.0 http://git-wip-us.apache.org/repos/asf/spark/blob/642c43c8/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 47b6f80..bf03c61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -40,7 +40,7 @@ private[sql] abstract class SQLImplicits { implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name) /** - * Creates a DataFrame from an RDD of case classes or tuples. + * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples). * @since 1.3.0 */ implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10121] [SQL] Thrift server always use the latest class loader provided by the conf of executionHive's state
Repository: spark Updated Branches: refs/heads/master 642c43c81 -> a0c0aae1d [SPARK-10121] [SQL] Thrift server always use the latest class loader provided by the conf of executionHive's state https://issues.apache.org/jira/browse/SPARK-10121 Looks like the problem is that if we add a jar through another thread, the thread handling the JDBC session will not get the latest classloader. Author: Yin Huai Closes #8368 from yhuai/SPARK-10121. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0c0aae1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0c0aae1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0c0aae1 Branch: refs/heads/master Commit: a0c0aae1defe5e1e57704065631d201f8e3f6bac Parents: 642c43c Author: Yin Huai Authored: Tue Aug 25 12:49:50 2015 +0800 Committer: Cheng Lian Committed: Tue Aug 25 12:49:50 2015 +0800 -- .../SparkExecuteStatementOperation.scala| 6 +++ .../thriftserver/HiveThriftServer2Suites.scala | 54 2 files changed, 60 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a0c0aae1/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 833bf62..02cc7e5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -159,6 +159,12 @@ private[hive] class SparkExecuteStatementOperation( // User information is part of the metastore client member in Hive hiveContext.setSession(currentSqlSession) + // Always use the latest class loader provided by executionHive's state. + val executionHiveClassLoader = +hiveContext.executionHive.state.getConf.getClassLoader + sessionHive.getConf.setClassLoader(executionHiveClassLoader) + parentSessionState.getConf.setClassLoader(executionHiveClassLoader) + Hive.set(sessionHive) SessionState.setCurrentSessionState(parentSessionState) try { http://git-wip-us.apache.org/repos/asf/spark/blob/a0c0aae1/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index ded42bc..b72249b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -377,6 +377,60 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { rs2.close() } } + + test("test add jar") { +withMultipleConnectionJdbcStatement( + { +statement => + val jarFile = +"../hive/src/test/resources/hive-hcatalog-core-0.13.1.jar" + .split("/") + .mkString(File.separator) + + statement.executeQuery(s"ADD JAR $jarFile") + }, + + { +statement => + val queries = Seq( +"DROP TABLE IF EXISTS smallKV", +"CREATE TABLE smallKV(key INT, val STRING)", +s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE smallKV", +"DROP TABLE IF EXISTS addJar", +"""CREATE TABLE addJar(key string) + |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' +""".stripMargin) + + queries.foreach(statement.execute) + + statement.executeQuery( +""" + |INSERT INTO TABLE addJar SELECT 'k1' as key FROM smallKV limit 1 +""".stripMargin) + + val actualResult = +statement.executeQuery("SELECT key FROM addJar") + val actualResultBuffer = new collection.mutable.ArrayBuffer[String]() + while (actualResult.next()) { +actualResultBuffer += actualResult.getString(1) + } + actualResult.close() + + val expectedResult = +statement.executeQuery("SELECT 'k1'") + val expectedResultBuffer = ne
spark git commit: [SPARK-10121] [SQL] Thrift server always use the latest class loader provided by the conf of executionHive's state
Repository: spark Updated Branches: refs/heads/branch-1.5 2f7e4b416 -> c99f4160b [SPARK-10121] [SQL] Thrift server always use the latest class loader provided by the conf of executionHive's state https://issues.apache.org/jira/browse/SPARK-10121 Looks like the problem is that if we add a jar through another thread, the thread handling the JDBC session will not get the latest classloader. Author: Yin Huai Closes #8368 from yhuai/SPARK-10121. (cherry picked from commit a0c0aae1defe5e1e57704065631d201f8e3f6bac) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c99f4160 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c99f4160 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c99f4160 Branch: refs/heads/branch-1.5 Commit: c99f4160b98bc0685c23fee4eb7b892c47f6feda Parents: 2f7e4b4 Author: Yin Huai Authored: Tue Aug 25 12:49:50 2015 +0800 Committer: Cheng Lian Committed: Tue Aug 25 12:50:44 2015 +0800 -- .../SparkExecuteStatementOperation.scala| 6 +++ .../thriftserver/HiveThriftServer2Suites.scala | 54 2 files changed, 60 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c99f4160/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 833bf62..02cc7e5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -159,6 +159,12 @@ private[hive] class SparkExecuteStatementOperation( // User information is part of the metastore client member in Hive hiveContext.setSession(currentSqlSession) + // Always use the latest class loader provided by executionHive's state. + val executionHiveClassLoader = +hiveContext.executionHive.state.getConf.getClassLoader + sessionHive.getConf.setClassLoader(executionHiveClassLoader) + parentSessionState.getConf.setClassLoader(executionHiveClassLoader) + Hive.set(sessionHive) SessionState.setCurrentSessionState(parentSessionState) try { http://git-wip-us.apache.org/repos/asf/spark/blob/c99f4160/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index ded42bc..b72249b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -377,6 +377,60 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { rs2.close() } } + + test("test add jar") { +withMultipleConnectionJdbcStatement( + { +statement => + val jarFile = +"../hive/src/test/resources/hive-hcatalog-core-0.13.1.jar" + .split("/") + .mkString(File.separator) + + statement.executeQuery(s"ADD JAR $jarFile") + }, + + { +statement => + val queries = Seq( +"DROP TABLE IF EXISTS smallKV", +"CREATE TABLE smallKV(key INT, val STRING)", +s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE smallKV", +"DROP TABLE IF EXISTS addJar", +"""CREATE TABLE addJar(key string) + |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' +""".stripMargin) + + queries.foreach(statement.execute) + + statement.executeQuery( +""" + |INSERT INTO TABLE addJar SELECT 'k1' as key FROM smallKV limit 1 +""".stripMargin) + + val actualResult = +statement.executeQuery("SELECT key FROM addJar") + val actualResultBuffer = new collection.mutable.ArrayBuffer[String]() + while (actualResult.next()) { +actualResultBuffer += actualResult.getString(1) + } + actualResult.close() + + val exp
spark git commit: [SPARK-10178] [SQL] HiveComparisionTest should print out dependent tables
Repository: spark Updated Branches: refs/heads/master a0c0aae1d -> 5175ca0c8 [SPARK-10178] [SQL] HiveComparisionTest should print out dependent tables In `HiveComparisionTest`s it is possible to fail a query of the form `SELECT * FROM dest1`, where `dest1` is the query that is actually computing the incorrect results. To aid debugging this patch improves the harness to also print these query plans and their results. Author: Michael Armbrust Closes #8388 from marmbrus/generatedTables. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5175ca0c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5175ca0c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5175ca0c Branch: refs/heads/master Commit: 5175ca0c85b10045d12c3fb57b1e52278a413ecf Parents: a0c0aae Author: Michael Armbrust Authored: Mon Aug 24 23:15:27 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 24 23:15:27 2015 -0700 -- .../sql/hive/execution/HiveComparisonTest.scala | 36 1 file changed, 36 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5175ca0c/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 2bdb0e1..4d45249 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution import java.io._ +import scala.util.control.NonFatal + import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} import org.apache.spark.{Logging, SparkFunSuite} @@ -386,11 +388,45 @@ abstract class HiveComparisonTest hiveCacheFiles.foreach(_.delete()) } + // If this query is reading other tables that were created during this test run + // also print out the query plans and results for those. + val computedTablesMessages: String = try { +val tablesRead = new TestHive.QueryExecution(query).executedPlan.collect { + case ts: HiveTableScan => ts.relation.tableName +}.toSet + +TestHive.reset() +val executions = queryList.map(new TestHive.QueryExecution(_)) +executions.foreach(_.toRdd) +val tablesGenerated = queryList.zip(executions).flatMap { + case (q, e) => e.executedPlan.collect { +case i: InsertIntoHiveTable if tablesRead contains i.table.tableName => + (q, e, i) + } +} + +tablesGenerated.map { case (hiveql, execution, insert) => + s""" + |=== Generated Table === + |$hiveql + |$execution + |== Results == + |${insert.child.execute().collect().mkString("\n")} + """.stripMargin +}.mkString("\n") + + } catch { +case NonFatal(e) => + logError("Failed to compute generated tables", e) + s"Couldn't compute dependent tables: $e" + } + val errorMessage = s""" |Results do not match for $testCaseName: |$hiveQuery\n${hiveQuery.analyzed.output.map(_.name).mkString("\t")} |$resultComparison + |$computedTablesMessages """.stripMargin stringToFile(new File(wrongDirectory, testCaseName), errorMessage + consoleTestCase) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10178] [SQL] HiveComparisionTest should print out dependent tables
Repository: spark Updated Branches: refs/heads/branch-1.5 c99f4160b -> 2239a2036 [SPARK-10178] [SQL] HiveComparisionTest should print out dependent tables In `HiveComparisionTest`s it is possible to fail a query of the form `SELECT * FROM dest1`, where `dest1` is the query that is actually computing the incorrect results. To aid debugging this patch improves the harness to also print these query plans and their results. Author: Michael Armbrust Closes #8388 from marmbrus/generatedTables. (cherry picked from commit 5175ca0c85b10045d12c3fb57b1e52278a413ecf) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2239a203 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2239a203 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2239a203 Branch: refs/heads/branch-1.5 Commit: 2239a20368b7833ffe0059941478924c7be87bbe Parents: c99f416 Author: Michael Armbrust Authored: Mon Aug 24 23:15:27 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 24 23:15:34 2015 -0700 -- .../sql/hive/execution/HiveComparisonTest.scala | 36 1 file changed, 36 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2239a203/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 2bdb0e1..4d45249 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution import java.io._ +import scala.util.control.NonFatal + import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} import org.apache.spark.{Logging, SparkFunSuite} @@ -386,11 +388,45 @@ abstract class HiveComparisonTest hiveCacheFiles.foreach(_.delete()) } + // If this query is reading other tables that were created during this test run + // also print out the query plans and results for those. + val computedTablesMessages: String = try { +val tablesRead = new TestHive.QueryExecution(query).executedPlan.collect { + case ts: HiveTableScan => ts.relation.tableName +}.toSet + +TestHive.reset() +val executions = queryList.map(new TestHive.QueryExecution(_)) +executions.foreach(_.toRdd) +val tablesGenerated = queryList.zip(executions).flatMap { + case (q, e) => e.executedPlan.collect { +case i: InsertIntoHiveTable if tablesRead contains i.table.tableName => + (q, e, i) + } +} + +tablesGenerated.map { case (hiveql, execution, insert) => + s""" + |=== Generated Table === + |$hiveql + |$execution + |== Results == + |${insert.child.execute().collect().mkString("\n")} + """.stripMargin +}.mkString("\n") + + } catch { +case NonFatal(e) => + logError("Failed to compute generated tables", e) + s"Couldn't compute dependent tables: $e" + } + val errorMessage = s""" |Results do not match for $testCaseName: |$hiveQuery\n${hiveQuery.analyzed.output.map(_.name).mkString("\t")} |$resultComparison + |$computedTablesMessages """.stripMargin stringToFile(new File(wrongDirectory, testCaseName), errorMessage + consoleTestCase) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…
Repository: spark Updated Branches: refs/heads/master 5175ca0c8 -> d9c25dec8 [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa⦠â¦ult maxRatePerPartition setting of 0 Author: cody koeninger Closes #8413 from koeninger/backpressure-testing-master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9c25dec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9c25dec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9c25dec Branch: refs/heads/master Commit: d9c25dec87e6da7d66a47ff94e7eefa008081b9d Parents: 5175ca0 Author: cody koeninger Authored: Mon Aug 24 23:26:14 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 24 23:26:14 2015 -0700 -- .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d9c25dec/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 8a17707..194 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -95,8 +95,13 @@ class DirectKafkaInputDStream[ val effectiveRateLimitPerPartition = estimatedRateLimit .filter(_ > 0) - .map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions))) - .getOrElse(maxRateLimitPerPartition) + .map { limit => +if (maxRateLimitPerPartition > 0) { + Math.min(maxRateLimitPerPartition, (limit / numPartitions)) +} else { + limit / numPartitions +} + }.getOrElse(maxRateLimitPerPartition) if (effectiveRateLimitPerPartition > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…
Repository: spark Updated Branches: refs/heads/branch-1.5 2239a2036 -> 88991dc4f [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa⦠â¦ult maxRatePerPartition setting of 0 Author: cody koeninger Closes #8413 from koeninger/backpressure-testing-master. (cherry picked from commit d9c25dec87e6da7d66a47ff94e7eefa008081b9d) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88991dc4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88991dc4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88991dc4 Branch: refs/heads/branch-1.5 Commit: 88991dc4f04b0c88466c6eab5ada43506c981341 Parents: 2239a20 Author: cody koeninger Authored: Mon Aug 24 23:26:14 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 24 23:26:27 2015 -0700 -- .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88991dc4/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 8a17707..194 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -95,8 +95,13 @@ class DirectKafkaInputDStream[ val effectiveRateLimitPerPartition = estimatedRateLimit .filter(_ > 0) - .map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions))) - .getOrElse(maxRateLimitPerPartition) + .map { limit => +if (maxRateLimitPerPartition > 0) { + Math.min(maxRateLimitPerPartition, (limit / numPartitions)) +} else { + limit / numPartitions +} + }.getOrElse(maxRateLimitPerPartition) if (effectiveRateLimitPerPartition > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers returns balanced results
Repository: spark Updated Branches: refs/heads/branch-1.5 88991dc4f -> bb1357f36 [SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers returns balanced results This PR fixes the following cases for `ReceiverSchedulingPolicy`. 1) Assume there are 4 executors: host1, host2, host3, host4, and 5 receivers: r1, r2, r3, r4, r5. Then `ReceiverSchedulingPolicy.scheduleReceivers` will return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1). Let's assume r1 starts at first on `host1` as `scheduleReceivers` suggested, and try to register with ReceiverTracker. But the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will return (host2, host3, host4) according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 0.5, host4 -> 0.5), so ReceiverTracker will reject `r1`. This is unexpected since r1 is starting exactly where `scheduleReceivers` suggested. This case can be fixed by ignoring the information of the receiver that is rescheduling in `receiverTrackingInfoMap`. 2) Assume there are 3 executors (host1, host2, host3) and each executors has 3 cores, and 3 receivers: r1, r2, r3. Assume r1 is running on host1. Now r2 is restarting, the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will always return (host1, host2, host3). So it's possible that r2 will be scheduled to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that there are 3 receivers running on host1, while host2 and host3 are idle. This issue can be fixed by returning only executors that have the minimum wight rather than returning at least 3 executors. Author: zsxwing Closes #8340 from zsxwing/fix-receiver-scheduling. (cherry picked from commit f023aa2fcc1d1dbb82aee568be0a8f2457c309ae) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb1357f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb1357f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb1357f3 Branch: refs/heads/branch-1.5 Commit: bb1357f362cdd96b854c2a0a193496ce709cdbdd Parents: 88991dc Author: zsxwing Authored: Mon Aug 24 23:34:50 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 24 23:35:02 2015 -0700 -- .../scheduler/ReceiverSchedulingPolicy.scala| 58 +++--- .../streaming/scheduler/ReceiverTracker.scala | 106 --- .../ReceiverSchedulingPolicySuite.scala | 13 +-- 3 files changed, 120 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bb1357f3/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala index ef5b687..10b5a7f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala @@ -22,6 +22,36 @@ import scala.collection.mutable import org.apache.spark.streaming.receiver.Receiver +/** + * A class that tries to schedule receivers with evenly distributed. There are two phases for + * scheduling receivers. + * + * - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule + * all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase. + * It will try to schedule receivers with evenly distributed. ReceiverTracker should update its + * receiverTrackingInfoMap according to the results of `scheduleReceivers`. + * `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that + * contains the scheduled locations. Then when a receiver is starting, it will send a register + * request and `ReceiverTracker.registerReceiver` will be called. In + * `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check + * if the location of this receiver is one of the scheduled executors, if not, the register will + * be rejected. + * - The second phase is local scheduling when a receiver is restarting. There are two cases of + * receiver restarting: + * - If a receiver is restarting because it's rejected due to the real location and the scheduled + * executors mismatching, in other words, it fails to start in one of the locations that + * `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are + * still alive in the list of scheduled executors, then use them to launch the receiver job. + * - If a rec
spark git commit: [SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers returns balanced results
Repository: spark Updated Branches: refs/heads/master d9c25dec8 -> f023aa2fc [SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers returns balanced results This PR fixes the following cases for `ReceiverSchedulingPolicy`. 1) Assume there are 4 executors: host1, host2, host3, host4, and 5 receivers: r1, r2, r3, r4, r5. Then `ReceiverSchedulingPolicy.scheduleReceivers` will return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1). Let's assume r1 starts at first on `host1` as `scheduleReceivers` suggested, and try to register with ReceiverTracker. But the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will return (host2, host3, host4) according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 0.5, host4 -> 0.5), so ReceiverTracker will reject `r1`. This is unexpected since r1 is starting exactly where `scheduleReceivers` suggested. This case can be fixed by ignoring the information of the receiver that is rescheduling in `receiverTrackingInfoMap`. 2) Assume there are 3 executors (host1, host2, host3) and each executors has 3 cores, and 3 receivers: r1, r2, r3. Assume r1 is running on host1. Now r2 is restarting, the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will always return (host1, host2, host3). So it's possible that r2 will be scheduled to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that there are 3 receivers running on host1, while host2 and host3 are idle. This issue can be fixed by returning only executors that have the minimum wight rather than returning at least 3 executors. Author: zsxwing Closes #8340 from zsxwing/fix-receiver-scheduling. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f023aa2f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f023aa2f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f023aa2f Branch: refs/heads/master Commit: f023aa2fcc1d1dbb82aee568be0a8f2457c309ae Parents: d9c25de Author: zsxwing Authored: Mon Aug 24 23:34:50 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 24 23:34:50 2015 -0700 -- .../scheduler/ReceiverSchedulingPolicy.scala| 58 +++--- .../streaming/scheduler/ReceiverTracker.scala | 106 --- .../ReceiverSchedulingPolicySuite.scala | 13 +-- 3 files changed, 120 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f023aa2f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala index ef5b687..10b5a7f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala @@ -22,6 +22,36 @@ import scala.collection.mutable import org.apache.spark.streaming.receiver.Receiver +/** + * A class that tries to schedule receivers with evenly distributed. There are two phases for + * scheduling receivers. + * + * - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule + * all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase. + * It will try to schedule receivers with evenly distributed. ReceiverTracker should update its + * receiverTrackingInfoMap according to the results of `scheduleReceivers`. + * `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that + * contains the scheduled locations. Then when a receiver is starting, it will send a register + * request and `ReceiverTracker.registerReceiver` will be called. In + * `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check + * if the location of this receiver is one of the scheduled executors, if not, the register will + * be rejected. + * - The second phase is local scheduling when a receiver is restarting. There are two cases of + * receiver restarting: + * - If a receiver is restarting because it's rejected due to the real location and the scheduled + * executors mismatching, in other words, it fails to start in one of the locations that + * `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are + * still alive in the list of scheduled executors, then use them to launch the receiver job. + * - If a receiver is restarting without a scheduled executors list, or the executors in the list + * are dead, `Rec
spark git commit: [SPARK-10196] [SQL] Correctly saving decimals in internal rows to JSON.
Repository: spark Updated Branches: refs/heads/master f023aa2fc -> df7041d02 [SPARK-10196] [SQL] Correctly saving decimals in internal rows to JSON. https://issues.apache.org/jira/browse/SPARK-10196 Author: Yin Huai Closes #8408 from yhuai/DecimalJsonSPARK-10196. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df7041d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df7041d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df7041d0 Branch: refs/heads/master Commit: df7041d02d3fd44b08a859f5d77bf6fb726895f0 Parents: f023aa2 Author: Yin Huai Authored: Mon Aug 24 23:38:32 2015 -0700 Committer: Davies Liu Committed: Mon Aug 24 23:38:32 2015 -0700 -- .../datasources/json/JacksonGenerator.scala | 2 +- .../sql/sources/JsonHadoopFsRelationSuite.scala | 27 2 files changed, 28 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/df7041d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala index 99ac773..330ba90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala @@ -95,7 +95,7 @@ private[sql] object JacksonGenerator { case (FloatType, v: Float) => gen.writeNumber(v) case (DoubleType, v: Double) => gen.writeNumber(v) case (LongType, v: Long) => gen.writeNumber(v) - case (DecimalType(), v: java.math.BigDecimal) => gen.writeNumber(v) + case (DecimalType(), v: Decimal) => gen.writeNumber(v.toJavaBigDecimal) case (ByteType, v: Byte) => gen.writeNumber(v.toInt) case (BinaryType, v: Array[Byte]) => gen.writeBinary(v) case (BooleanType, v: Boolean) => gen.writeBoolean(v) http://git-wip-us.apache.org/repos/asf/spark/blob/df7041d0/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala index ed6d512..8ca3a17 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.sources +import java.math.BigDecimal + import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -75,4 +77,29 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { ) } } + + test("SPARK-10196: save decimal type to JSON") { +withTempDir { file => + file.delete() + + val schema = +new StructType() + .add("decimal", DecimalType(7, 2)) + + val data = +Row(new BigDecimal("10.02")) :: + Row(new BigDecimal("2.99")) :: + Row(new BigDecimal("1")) :: Nil + val df = createDataFrame(sparkContext.parallelize(data), schema) + + // Write the data out. + df.write.format(dataSourceName).save(file.getCanonicalPath) + + // Read it back and check the result. + checkAnswer( +read.format(dataSourceName).schema(schema).load(file.getCanonicalPath), +df + ) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10196] [SQL] Correctly saving decimals in internal rows to JSON.
Repository: spark Updated Branches: refs/heads/branch-1.5 bb1357f36 -> 0b425ed3d [SPARK-10196] [SQL] Correctly saving decimals in internal rows to JSON. https://issues.apache.org/jira/browse/SPARK-10196 Author: Yin Huai Closes #8408 from yhuai/DecimalJsonSPARK-10196. (cherry picked from commit df7041d02d3fd44b08a859f5d77bf6fb726895f0) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b425ed3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b425ed3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b425ed3 Branch: refs/heads/branch-1.5 Commit: 0b425ed3d55f7e9c39a259ce4b8d86a41a7bd403 Parents: bb1357f Author: Yin Huai Authored: Mon Aug 24 23:38:32 2015 -0700 Committer: Davies Liu Committed: Mon Aug 24 23:38:42 2015 -0700 -- .../datasources/json/JacksonGenerator.scala | 2 +- .../sql/sources/JsonHadoopFsRelationSuite.scala | 27 2 files changed, 28 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0b425ed3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala index 99ac773..330ba90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala @@ -95,7 +95,7 @@ private[sql] object JacksonGenerator { case (FloatType, v: Float) => gen.writeNumber(v) case (DoubleType, v: Double) => gen.writeNumber(v) case (LongType, v: Long) => gen.writeNumber(v) - case (DecimalType(), v: java.math.BigDecimal) => gen.writeNumber(v) + case (DecimalType(), v: Decimal) => gen.writeNumber(v.toJavaBigDecimal) case (ByteType, v: Byte) => gen.writeNumber(v.toInt) case (BinaryType, v: Array[Byte]) => gen.writeBinary(v) case (BooleanType, v: Boolean) => gen.writeBoolean(v) http://git-wip-us.apache.org/repos/asf/spark/blob/0b425ed3/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala index ed6d512..8ca3a17 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.sources +import java.math.BigDecimal + import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -75,4 +77,29 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { ) } } + + test("SPARK-10196: save decimal type to JSON") { +withTempDir { file => + file.delete() + + val schema = +new StructType() + .add("decimal", DecimalType(7, 2)) + + val data = +Row(new BigDecimal("10.02")) :: + Row(new BigDecimal("2.99")) :: + Row(new BigDecimal("1")) :: Nil + val df = createDataFrame(sparkContext.parallelize(data), schema) + + // Write the data out. + df.write.format(dataSourceName).save(file.getCanonicalPath) + + // Read it back and check the result. + checkAnswer( +read.format(dataSourceName).schema(schema).load(file.getCanonicalPath), +df + ) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10136] [SQL] A more robust fix for SPARK-10136
Repository: spark Updated Branches: refs/heads/master df7041d02 -> bf03fe68d [SPARK-10136] [SQL] A more robust fix for SPARK-10136 PR #8341 is a valid fix for SPARK-10136, but it didn't catch the real root cause. The real problem can be rather tricky to explain, and requires audiences to be pretty familiar with parquet-format spec, especially details of `LIST` backwards-compatibility rules. Let me have a try to give an explanation here. The structure of the problematic Parquet schema generated by parquet-avro is something like this: ``` message m { group f (LIST) { // Level 1 repeated group array (LIST) { // Level 2 repeated array; // Level 3 } } } ``` (The schema generated by parquet-thrift is structurally similar, just replace the `array` at level 2 with `f_tuple`, and the other one at level 3 with `f_tuple_tuple`.) This structure consists of two nested legacy 2-level `LIST`-like structures: 1. The repeated group type at level 2 is the element type of the outer array defined at level 1 This group should map to an `CatalystArrayConverter.ElementConverter` when building converters. 2. The repeated primitive type at level 3 is the element type of the inner array defined at level 2 This group should also map to an `CatalystArrayConverter.ElementConverter`. The root cause of SPARK-10136 is that, the group at level 2 isn't properly recognized as the element type of level 1. Thus, according to parquet-format spec, the repeated primitive at level 3 is left as a so called "unannotated repeated primitive type", and is recognized as a required list of required primitive type, thus a `RepeatedPrimitiveConverter` instead of a `CatalystArrayConverter.ElementConverter` is created for it. According to parquet-format spec, unannotated repeated type shouldn't appear in a `LIST`- or `MAP`-annotated group. PR #8341 fixed this issue by allowing such unannotated repeated type appear in `LIST`-annotated groups, which is a non-standard, hacky, but valid fix. (I didn't realize this when authoring #8341 though.) As for the reason why level 2 isn't recognized as a list element type, it's because of the following `LIST` backwards-compatibility rule defined in the parquet-format spec: > If the repeated field is a group with one field and is named either `array` > or uses the `LIST`-annotated group's name with `_tuple` appended then the > repeated type is the element type and elements are required. (The `array` part is for parquet-avro compatibility, while the `_tuple` part is for parquet-thrift.) This rule is implemented in [`CatalystSchemaConverter.isElementType`] [1], but neglected in [`CatalystRowConverter.isElementType`] [2]. This PR delivers a more robust fix by adding this rule in the latter method. Note that parquet-avro 1.7.0 also suffers from this issue. Details can be found at [PARQUET-364] [3]. [1]: https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala#L259-L305 [2]: https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala#L456-L463 [3]: https://issues.apache.org/jira/browse/PARQUET-364 Author: Cheng Lian Closes #8361 from liancheng/spark-10136/proper-version. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf03fe68 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf03fe68 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf03fe68 Branch: refs/heads/master Commit: bf03fe68d62f33dda70dff45c3bda1f57b032dfc Parents: df7041d Author: Cheng Lian Authored: Tue Aug 25 14:58:42 2015 +0800 Committer: Cheng Lian Committed: Tue Aug 25 14:58:42 2015 +0800 -- .../parquet/CatalystRowConverter.scala| 18 -- 1 file changed, 8 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bf03fe68/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index d2c2db5..cbf0704 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -415,8 +415,9 @@ private[parquet] class CatalystRowConverter( private val elementConve
spark git commit: [SPARK-10136] [SQL] A more robust fix for SPARK-10136
Repository: spark Updated Branches: refs/heads/branch-1.5 0b425ed3d -> 95a14e9f2 [SPARK-10136] [SQL] A more robust fix for SPARK-10136 PR #8341 is a valid fix for SPARK-10136, but it didn't catch the real root cause. The real problem can be rather tricky to explain, and requires audiences to be pretty familiar with parquet-format spec, especially details of `LIST` backwards-compatibility rules. Let me have a try to give an explanation here. The structure of the problematic Parquet schema generated by parquet-avro is something like this: ``` message m { group f (LIST) { // Level 1 repeated group array (LIST) { // Level 2 repeated array; // Level 3 } } } ``` (The schema generated by parquet-thrift is structurally similar, just replace the `array` at level 2 with `f_tuple`, and the other one at level 3 with `f_tuple_tuple`.) This structure consists of two nested legacy 2-level `LIST`-like structures: 1. The repeated group type at level 2 is the element type of the outer array defined at level 1 This group should map to an `CatalystArrayConverter.ElementConverter` when building converters. 2. The repeated primitive type at level 3 is the element type of the inner array defined at level 2 This group should also map to an `CatalystArrayConverter.ElementConverter`. The root cause of SPARK-10136 is that, the group at level 2 isn't properly recognized as the element type of level 1. Thus, according to parquet-format spec, the repeated primitive at level 3 is left as a so called "unannotated repeated primitive type", and is recognized as a required list of required primitive type, thus a `RepeatedPrimitiveConverter` instead of a `CatalystArrayConverter.ElementConverter` is created for it. According to parquet-format spec, unannotated repeated type shouldn't appear in a `LIST`- or `MAP`-annotated group. PR #8341 fixed this issue by allowing such unannotated repeated type appear in `LIST`-annotated groups, which is a non-standard, hacky, but valid fix. (I didn't realize this when authoring #8341 though.) As for the reason why level 2 isn't recognized as a list element type, it's because of the following `LIST` backwards-compatibility rule defined in the parquet-format spec: > If the repeated field is a group with one field and is named either `array` > or uses the `LIST`-annotated group's name with `_tuple` appended then the > repeated type is the element type and elements are required. (The `array` part is for parquet-avro compatibility, while the `_tuple` part is for parquet-thrift.) This rule is implemented in [`CatalystSchemaConverter.isElementType`] [1], but neglected in [`CatalystRowConverter.isElementType`] [2]. This PR delivers a more robust fix by adding this rule in the latter method. Note that parquet-avro 1.7.0 also suffers from this issue. Details can be found at [PARQUET-364] [3]. [1]: https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala#L259-L305 [2]: https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala#L456-L463 [3]: https://issues.apache.org/jira/browse/PARQUET-364 Author: Cheng Lian Closes #8361 from liancheng/spark-10136/proper-version. (cherry picked from commit bf03fe68d62f33dda70dff45c3bda1f57b032dfc) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95a14e9f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95a14e9f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95a14e9f Branch: refs/heads/branch-1.5 Commit: 95a14e9f232a22548d97c8704d781d089188 Parents: 0b425ed Author: Cheng Lian Authored: Tue Aug 25 14:58:42 2015 +0800 Committer: Cheng Lian Committed: Tue Aug 25 14:58:57 2015 +0800 -- .../parquet/CatalystRowConverter.scala| 18 -- 1 file changed, 8 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/95a14e9f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index d2c2db5..cbf0704 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConve