spark git commit: [SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream
Repository: spark Updated Branches: refs/heads/branch-1.3 01905c41e -> 281614d7c [SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream Changes - Added example - Added a critical unit test that verifies that offset ranges can be recovered through checkpoints Might add more changes. Author: Tathagata Das Closes #4384 from tdas/new-kafka-fixes and squashes the following commits: 7c931c3 [Tathagata Das] Small update 3ed9284 [Tathagata Das] updated scala doc 83d0402 [Tathagata Das] Added JavaDirectKafkaWordCount example. 26df23c [Tathagata Das] Updates based on PR comments from Cody e4abf69 [Tathagata Das] Scala doc improvements and stuff. bb65232 [Tathagata Das] Fixed test bug and refactored KafkaStreamSuite 50f2b56 [Tathagata Das] Added Java API and added more Scala and Java unit tests. Also updated docs. e73589c [Tathagata Das] Minor changes. 4986784 [Tathagata Das] Added unit test to kafka offset recovery 6a91cab [Tathagata Das] Added example (cherry picked from commit c15134632e74e3dee05eda20c6ef79915e15d02e) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/281614d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/281614d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/281614d7 Branch: refs/heads/branch-1.3 Commit: 281614d7cd7870dc8c140c08f15902109658360d Parents: 01905c4 Author: Tathagata Das Authored: Mon Feb 9 22:45:48 2015 -0800 Committer: Tathagata Das Committed: Mon Feb 9 22:47:09 2015 -0800 -- .../streaming/JavaDirectKafkaWordCount.java | 113 ++ .../streaming/DirectKafkaWordCount.scala| 71 .../kafka/DirectKafkaInputDStream.scala | 5 +- .../spark/streaming/kafka/KafkaCluster.scala| 3 + .../apache/spark/streaming/kafka/KafkaRDD.scala | 12 +- .../streaming/kafka/KafkaRDDPartition.scala | 23 +- .../spark/streaming/kafka/KafkaUtils.scala | 353 ++- .../apache/spark/streaming/kafka/Leader.scala | 21 +- .../spark/streaming/kafka/OffsetRange.scala | 53 ++- .../kafka/JavaDirectKafkaStreamSuite.java | 159 + .../streaming/kafka/JavaKafkaStreamSuite.java | 5 +- .../kafka/DirectKafkaStreamSuite.scala | 302 .../streaming/kafka/KafkaClusterSuite.scala | 24 +- .../kafka/KafkaDirectStreamSuite.scala | 92 - .../spark/streaming/kafka/KafkaRDDSuite.scala | 8 +- .../streaming/kafka/KafkaStreamSuite.scala | 62 ++-- .../kafka/ReliableKafkaStreamSuite.scala| 4 +- 17 files changed, 1048 insertions(+), 262 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/281614d7/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java -- diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java new file mode 100644 index 000..bab9f24 --- /dev/null +++ b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Arrays; +import java.util.regex.Pattern; + +import scala.Tuple2; + +import com.google.common.collect.Lists; +import kafka.serializer.StringDecoder; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.*; +import org.apache.spark.streaming.api.java.*; +import org.apache.spark.streaming.kafka.KafkaUtils; +import org.apache.spark.streaming.Durations; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: DirectKafkaWordCount + *is a list of one or more Kafka brokers + *is a list of one or more kafka topics to consume from + * + * Example: +
spark git commit: [SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream
Repository: spark Updated Branches: refs/heads/master ef2f55b97 -> c15134632 [SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream Changes - Added example - Added a critical unit test that verifies that offset ranges can be recovered through checkpoints Might add more changes. Author: Tathagata Das Closes #4384 from tdas/new-kafka-fixes and squashes the following commits: 7c931c3 [Tathagata Das] Small update 3ed9284 [Tathagata Das] updated scala doc 83d0402 [Tathagata Das] Added JavaDirectKafkaWordCount example. 26df23c [Tathagata Das] Updates based on PR comments from Cody e4abf69 [Tathagata Das] Scala doc improvements and stuff. bb65232 [Tathagata Das] Fixed test bug and refactored KafkaStreamSuite 50f2b56 [Tathagata Das] Added Java API and added more Scala and Java unit tests. Also updated docs. e73589c [Tathagata Das] Minor changes. 4986784 [Tathagata Das] Added unit test to kafka offset recovery 6a91cab [Tathagata Das] Added example Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1513463 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1513463 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1513463 Branch: refs/heads/master Commit: c15134632e74e3dee05eda20c6ef79915e15d02e Parents: ef2f55b Author: Tathagata Das Authored: Mon Feb 9 22:45:48 2015 -0800 Committer: Tathagata Das Committed: Mon Feb 9 22:45:48 2015 -0800 -- .../streaming/JavaDirectKafkaWordCount.java | 113 ++ .../streaming/DirectKafkaWordCount.scala| 71 .../kafka/DirectKafkaInputDStream.scala | 5 +- .../spark/streaming/kafka/KafkaCluster.scala| 3 + .../apache/spark/streaming/kafka/KafkaRDD.scala | 12 +- .../streaming/kafka/KafkaRDDPartition.scala | 23 +- .../spark/streaming/kafka/KafkaUtils.scala | 353 ++- .../apache/spark/streaming/kafka/Leader.scala | 21 +- .../spark/streaming/kafka/OffsetRange.scala | 53 ++- .../kafka/JavaDirectKafkaStreamSuite.java | 159 + .../streaming/kafka/JavaKafkaStreamSuite.java | 5 +- .../kafka/DirectKafkaStreamSuite.scala | 302 .../streaming/kafka/KafkaClusterSuite.scala | 24 +- .../kafka/KafkaDirectStreamSuite.scala | 92 - .../spark/streaming/kafka/KafkaRDDSuite.scala | 8 +- .../streaming/kafka/KafkaStreamSuite.scala | 62 ++-- .../kafka/ReliableKafkaStreamSuite.scala| 4 +- 17 files changed, 1048 insertions(+), 262 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1513463/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java -- diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java new file mode 100644 index 000..bab9f24 --- /dev/null +++ b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Arrays; +import java.util.regex.Pattern; + +import scala.Tuple2; + +import com.google.common.collect.Lists; +import kafka.serializer.StringDecoder; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.*; +import org.apache.spark.streaming.api.java.*; +import org.apache.spark.streaming.kafka.KafkaUtils; +import org.apache.spark.streaming.Durations; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: DirectKafkaWordCount + *is a list of one or more Kafka brokers + *is a list of one or more kafka topics to consume from + * + * Example: + *$ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 + */ + +
spark git commit: [SPARK-5597][MLLIB] save/load for decision trees and emsembles
Repository: spark Updated Branches: refs/heads/branch-1.3 663d34ec8 -> 01905c41e [SPARK-5597][MLLIB] save/load for decision trees and emsembles This is based on # from jkbradley with the following changes: 1. Node schema updated to ~~~ treeId: int nodeId: Int predict/ |- predict: Double |- prob: Double impurity: Double isLeaf: Boolean split/ |- feature: Int |- threshold: Double |- featureType: Int |- categories: Array[Double] leftNodeId: Integer rightNodeId: Integer infoGain: Double ~~~ 2. Some refactor of the implementation. Closes #. Author: Joseph K. Bradley Author: Xiangrui Meng Closes #4493 from mengxr/SPARK-5597 and squashes the following commits: 75e3bb6 [Xiangrui Meng] fix style 2b0033d [Xiangrui Meng] update tree export schema and refactor the implementation 45873a2 [Joseph K. Bradley] org imports 1d4c264 [Joseph K. Bradley] Added save/load for tree ensembles dcdbf85 [Joseph K. Bradley] added save/load for decision tree but need to generalize it to ensembles (cherry picked from commit ef2f55b97f58fa06acb30e9e0172fb66fba383bc) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01905c41 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01905c41 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01905c41 Branch: refs/heads/branch-1.3 Commit: 01905c41e1a4f2b71f342b0e775593d30e1969e3 Parents: 663d34e Author: Joseph K. Bradley Authored: Mon Feb 9 22:09:07 2015 -0800 Committer: Xiangrui Meng Committed: Mon Feb 9 22:09:18 2015 -0800 -- .../mllib/tree/model/DecisionTreeModel.scala| 197 ++- .../mllib/tree/model/InformationGainStats.scala | 4 +- .../apache/spark/mllib/tree/model/Node.scala| 5 + .../apache/spark/mllib/tree/model/Predict.scala | 7 + .../mllib/tree/model/treeEnsembleModels.scala | 157 ++- .../spark/mllib/tree/DecisionTreeSuite.scala| 120 ++- .../mllib/tree/GradientBoostedTreesSuite.scala | 81 +--- .../spark/mllib/tree/RandomForestSuite.scala| 28 ++- 8 files changed, 561 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/01905c41/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index a25e625..89ecf37 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -17,11 +17,17 @@ package org.apache.spark.mllib.tree.model +import scala.collection.mutable + +import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.tree.configuration.{Algo, FeatureType} import org.apache.spark.mllib.tree.configuration.Algo._ +import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} /** * :: Experimental :: @@ -31,7 +37,7 @@ import org.apache.spark.rdd.RDD * @param algo algorithm type -- classification or regression */ @Experimental -class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable { +class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable with Saveable { /** * Predict values for a single data point using the model trained. @@ -98,4 +104,193 @@ class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable header + topNode.subtreeToString(2) } + override def save(sc: SparkContext, path: String): Unit = { +DecisionTreeModel.SaveLoadV1_0.save(sc, path, this) + } + + override protected def formatVersion: String = "1.0" +} + +object DecisionTreeModel extends Loader[DecisionTreeModel] { + + private[tree] object SaveLoadV1_0 { + +def thisFormatVersion = "1.0" + +// Hard-code class name string in case it changes in the future +def thisClassName = "org.apache.spark.mllib.tree.DecisionTreeModel" + +case class PredictData(predict: Double, prob: Double) { + def toPredict: Predict = new Predict(predict, prob) +} + +object PredictData { + def apply(p: Predict): PredictData = PredictData(p.predict, p.prob) + + def apply(r: Row): PredictData = PredictData(r.getDouble(0), r.getDouble(1)) +} + +case class SplitData( +feature: Int, +threshold: Double, +featureType: Int, +categories: Seq[Double]) { // TODO:
spark git commit: [SPARK-5597][MLLIB] save/load for decision trees and emsembles
Repository: spark Updated Branches: refs/heads/master bd0b5ea70 -> ef2f55b97 [SPARK-5597][MLLIB] save/load for decision trees and emsembles This is based on # from jkbradley with the following changes: 1. Node schema updated to ~~~ treeId: int nodeId: Int predict/ |- predict: Double |- prob: Double impurity: Double isLeaf: Boolean split/ |- feature: Int |- threshold: Double |- featureType: Int |- categories: Array[Double] leftNodeId: Integer rightNodeId: Integer infoGain: Double ~~~ 2. Some refactor of the implementation. Closes #. Author: Joseph K. Bradley Author: Xiangrui Meng Closes #4493 from mengxr/SPARK-5597 and squashes the following commits: 75e3bb6 [Xiangrui Meng] fix style 2b0033d [Xiangrui Meng] update tree export schema and refactor the implementation 45873a2 [Joseph K. Bradley] org imports 1d4c264 [Joseph K. Bradley] Added save/load for tree ensembles dcdbf85 [Joseph K. Bradley] added save/load for decision tree but need to generalize it to ensembles Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef2f55b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef2f55b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef2f55b9 Branch: refs/heads/master Commit: ef2f55b97f58fa06acb30e9e0172fb66fba383bc Parents: bd0b5ea Author: Joseph K. Bradley Authored: Mon Feb 9 22:09:07 2015 -0800 Committer: Xiangrui Meng Committed: Mon Feb 9 22:09:07 2015 -0800 -- .../mllib/tree/model/DecisionTreeModel.scala| 197 ++- .../mllib/tree/model/InformationGainStats.scala | 4 +- .../apache/spark/mllib/tree/model/Node.scala| 5 + .../apache/spark/mllib/tree/model/Predict.scala | 7 + .../mllib/tree/model/treeEnsembleModels.scala | 157 ++- .../spark/mllib/tree/DecisionTreeSuite.scala| 120 ++- .../mllib/tree/GradientBoostedTreesSuite.scala | 81 +--- .../spark/mllib/tree/RandomForestSuite.scala| 28 ++- 8 files changed, 561 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef2f55b9/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index a25e625..89ecf37 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -17,11 +17,17 @@ package org.apache.spark.mllib.tree.model +import scala.collection.mutable + +import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.tree.configuration.{Algo, FeatureType} import org.apache.spark.mllib.tree.configuration.Algo._ +import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} /** * :: Experimental :: @@ -31,7 +37,7 @@ import org.apache.spark.rdd.RDD * @param algo algorithm type -- classification or regression */ @Experimental -class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable { +class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable with Saveable { /** * Predict values for a single data point using the model trained. @@ -98,4 +104,193 @@ class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable header + topNode.subtreeToString(2) } + override def save(sc: SparkContext, path: String): Unit = { +DecisionTreeModel.SaveLoadV1_0.save(sc, path, this) + } + + override protected def formatVersion: String = "1.0" +} + +object DecisionTreeModel extends Loader[DecisionTreeModel] { + + private[tree] object SaveLoadV1_0 { + +def thisFormatVersion = "1.0" + +// Hard-code class name string in case it changes in the future +def thisClassName = "org.apache.spark.mllib.tree.DecisionTreeModel" + +case class PredictData(predict: Double, prob: Double) { + def toPredict: Predict = new Predict(predict, prob) +} + +object PredictData { + def apply(p: Predict): PredictData = PredictData(p.predict, p.prob) + + def apply(r: Row): PredictData = PredictData(r.getDouble(0), r.getDouble(1)) +} + +case class SplitData( +feature: Int, +threshold: Double, +featureType: Int, +categories: Seq[Double]) { // TODO: Change to List once SPARK-3365 is fixed + def toSplit: Split = { +new Split(feature, threshold
spark git commit: [SQL] Remove the duplicated code
Repository: spark Updated Branches: refs/heads/master a2d33d0b0 -> bd0b5ea70 [SQL] Remove the duplicated code Author: Cheng Hao Closes #4494 from chenghao-intel/tiny_code_change and squashes the following commits: 450dfe7 [Cheng Hao] remove the duplicated code Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd0b5ea7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd0b5ea7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd0b5ea7 Branch: refs/heads/master Commit: bd0b5ea708aa5b84adb67c039ec52408289718bb Parents: a2d33d0 Author: Cheng Hao Authored: Mon Feb 9 21:33:34 2015 -0800 Committer: Reynold Xin Committed: Mon Feb 9 21:33:34 2015 -0800 -- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 5 - 1 file changed, 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bd0b5ea7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 81bcf5a..edf8a5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -342,11 +342,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { ExecutedCommand( RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil - case LogicalDescribeCommand(table, isExtended) => -val resultPlan = self.sqlContext.executePlan(table).executedPlan -ExecutedCommand( - RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil - case _ => Nil } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL] Remove the duplicated code
Repository: spark Updated Branches: refs/heads/branch-1.3 6ddbca494 -> 663d34ec8 [SQL] Remove the duplicated code Author: Cheng Hao Closes #4494 from chenghao-intel/tiny_code_change and squashes the following commits: 450dfe7 [Cheng Hao] remove the duplicated code (cherry picked from commit bd0b5ea708aa5b84adb67c039ec52408289718bb) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/663d34ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/663d34ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/663d34ec Branch: refs/heads/branch-1.3 Commit: 663d34ec8116d411a9ce547a9c9a23c83fe64351 Parents: 6ddbca4 Author: Cheng Hao Authored: Mon Feb 9 21:33:34 2015 -0800 Committer: Reynold Xin Committed: Mon Feb 9 21:33:39 2015 -0800 -- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 5 - 1 file changed, 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/663d34ec/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 81bcf5a..edf8a5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -342,11 +342,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { ExecutedCommand( RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil - case LogicalDescribeCommand(table, isExtended) => -val resultPlan = self.sqlContext.executePlan(table).executedPlan -ExecutedCommand( - RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil - case _ => Nil } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5701] Only set ShuffleReadMetrics when task has shuffle deps
Repository: spark Updated Branches: refs/heads/master a95ed5215 -> a2d33d0b0 [SPARK-5701] Only set ShuffleReadMetrics when task has shuffle deps The updateShuffleReadMetrics method in TaskMetrics (called by the executor heartbeater) will currently always add a ShuffleReadMetrics to TaskMetrics (with values set to 0), even when the task didn't read any shuffle data. ShuffleReadMetrics should only be added if the task reads shuffle data. Author: Kay Ousterhout Closes #4488 from kayousterhout/SPARK-5701 and squashes the following commits: 673ed58 [Kay Ousterhout] SPARK-5701: Only set ShuffleReadMetrics when task has shuffle deps Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2d33d0b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2d33d0b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2d33d0b Branch: refs/heads/master Commit: a2d33d0b01af87e931d9d883638a52d7a86f6248 Parents: a95ed52 Author: Kay Ousterhout Authored: Mon Feb 9 21:22:09 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 21:22:09 2015 -0800 -- .../org/apache/spark/executor/TaskMetrics.scala | 22 --- .../spark/executor/TaskMetricsSuite.scala | 28 2 files changed, 40 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a2d33d0b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index d056591..bf3f1e4 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -177,8 +177,8 @@ class TaskMetrics extends Serializable { * Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed, * we can store all the different inputMetrics (one per readMethod). */ - private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod): -InputMetrics =synchronized { + private[spark] def getInputMetricsForReadMethod( + readMethod: DataReadMethod): InputMetrics = synchronized { _inputMetrics match { case None => val metrics = new InputMetrics(readMethod) @@ -195,15 +195,17 @@ class TaskMetrics extends Serializable { * Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics. */ private[spark] def updateShuffleReadMetrics(): Unit = synchronized { -val merged = new ShuffleReadMetrics() -for (depMetrics <- depsShuffleReadMetrics) { - merged.incFetchWaitTime(depMetrics.fetchWaitTime) - merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) - merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) - merged.incRemoteBytesRead(depMetrics.remoteBytesRead) - merged.incRecordsRead(depMetrics.recordsRead) +if (!depsShuffleReadMetrics.isEmpty) { + val merged = new ShuffleReadMetrics() + for (depMetrics <- depsShuffleReadMetrics) { +merged.incFetchWaitTime(depMetrics.fetchWaitTime) +merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) +merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) +merged.incRemoteBytesRead(depMetrics.remoteBytesRead) +merged.incRecordsRead(depMetrics.recordsRead) + } + _shuffleReadMetrics = Some(merged) } -_shuffleReadMetrics = Some(merged) } private[spark] def updateInputMetrics(): Unit = synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/a2d33d0b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala new file mode 100644 index 000..326e203 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
spark git commit: [SPARK-5701] Only set ShuffleReadMetrics when task has shuffle deps
Repository: spark Updated Branches: refs/heads/branch-1.3 832625509 -> 6ddbca494 [SPARK-5701] Only set ShuffleReadMetrics when task has shuffle deps The updateShuffleReadMetrics method in TaskMetrics (called by the executor heartbeater) will currently always add a ShuffleReadMetrics to TaskMetrics (with values set to 0), even when the task didn't read any shuffle data. ShuffleReadMetrics should only be added if the task reads shuffle data. Author: Kay Ousterhout Closes #4488 from kayousterhout/SPARK-5701 and squashes the following commits: 673ed58 [Kay Ousterhout] SPARK-5701: Only set ShuffleReadMetrics when task has shuffle deps (cherry picked from commit a2d33d0b01af87e931d9d883638a52d7a86f6248) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ddbca49 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ddbca49 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ddbca49 Branch: refs/heads/branch-1.3 Commit: 6ddbca494f49f9e2ef296eb7a604a133733966aa Parents: 8326255 Author: Kay Ousterhout Authored: Mon Feb 9 21:22:09 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 21:22:14 2015 -0800 -- .../org/apache/spark/executor/TaskMetrics.scala | 22 --- .../spark/executor/TaskMetricsSuite.scala | 28 2 files changed, 40 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6ddbca49/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index d056591..bf3f1e4 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -177,8 +177,8 @@ class TaskMetrics extends Serializable { * Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed, * we can store all the different inputMetrics (one per readMethod). */ - private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod): -InputMetrics =synchronized { + private[spark] def getInputMetricsForReadMethod( + readMethod: DataReadMethod): InputMetrics = synchronized { _inputMetrics match { case None => val metrics = new InputMetrics(readMethod) @@ -195,15 +195,17 @@ class TaskMetrics extends Serializable { * Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics. */ private[spark] def updateShuffleReadMetrics(): Unit = synchronized { -val merged = new ShuffleReadMetrics() -for (depMetrics <- depsShuffleReadMetrics) { - merged.incFetchWaitTime(depMetrics.fetchWaitTime) - merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) - merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) - merged.incRemoteBytesRead(depMetrics.remoteBytesRead) - merged.incRecordsRead(depMetrics.recordsRead) +if (!depsShuffleReadMetrics.isEmpty) { + val merged = new ShuffleReadMetrics() + for (depMetrics <- depsShuffleReadMetrics) { +merged.incFetchWaitTime(depMetrics.fetchWaitTime) +merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) +merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) +merged.incRemoteBytesRead(depMetrics.remoteBytesRead) +merged.incRecordsRead(depMetrics.recordsRead) + } + _shuffleReadMetrics = Some(merged) } -_shuffleReadMetrics = Some(merged) } private[spark] def updateInputMetrics(): Unit = synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/6ddbca49/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala new file mode 100644 index 000..326e203 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distri
spark git commit: [SPARK-5703] AllJobsPage throws empty.max exception
Repository: spark Updated Branches: refs/heads/branch-1.2 515f65804 -> 53de2378e [SPARK-5703] AllJobsPage throws empty.max exception If you have a `SparkListenerJobEnd` event without the corresponding `SparkListenerJobStart` event, then `JobProgressListener` will create an empty `JobUIData` with an empty `stageIds` list. However, later in `AllJobsPage` we call `stageIds.max`. If this is empty, it will throw an exception. This crashed my history server. Author: Andrew Or Closes #4490 from andrewor14/jobs-page-max and squashes the following commits: 21797d3 [Andrew Or] Check nonEmpty before calling max (cherry picked from commit a95ed52157473fb0e42e910ee15270e7f0edf943) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53de2378 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53de2378 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53de2378 Branch: refs/heads/branch-1.2 Commit: 53de2378e6ba7d72a5b5a91eee03c63f8fab62ff Parents: 515f658 Author: Andrew Or Authored: Mon Feb 9 21:18:48 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 21:18:59 2015 -0800 -- core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/53de2378/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index ea2d187..8ab26a7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -43,7 +43,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } def makeRow(job: JobUIData): Seq[Node] = { - val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max) + val lastStageInfo = Option(job.stageIds) +.filter(_.nonEmpty) +.flatMap { ids => listener.stageIdToInfo.get(ids.max) } val lastStageData = lastStageInfo.flatMap { s => listener.stageIdToData.get((s.stageId, s.attemptId)) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5703] AllJobsPage throws empty.max exception
Repository: spark Updated Branches: refs/heads/branch-1.3 6a1e0f967 -> 832625509 [SPARK-5703] AllJobsPage throws empty.max exception If you have a `SparkListenerJobEnd` event without the corresponding `SparkListenerJobStart` event, then `JobProgressListener` will create an empty `JobUIData` with an empty `stageIds` list. However, later in `AllJobsPage` we call `stageIds.max`. If this is empty, it will throw an exception. This crashed my history server. Author: Andrew Or Closes #4490 from andrewor14/jobs-page-max and squashes the following commits: 21797d3 [Andrew Or] Check nonEmpty before calling max (cherry picked from commit a95ed52157473fb0e42e910ee15270e7f0edf943) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83262550 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83262550 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83262550 Branch: refs/heads/branch-1.3 Commit: 832625509c36332baad377448f55a6cfc0337121 Parents: 6a1e0f9 Author: Andrew Or Authored: Mon Feb 9 21:18:48 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 21:18:53 2015 -0800 -- core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/83262550/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 045c69d..bd923d7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -42,7 +42,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } def makeRow(job: JobUIData): Seq[Node] = { - val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max) + val lastStageInfo = Option(job.stageIds) +.filter(_.nonEmpty) +.flatMap { ids => listener.stageIdToInfo.get(ids.max) } val lastStageData = lastStageInfo.flatMap { s => listener.stageIdToData.get((s.stageId, s.attemptId)) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5703] AllJobsPage throws empty.max exception
Repository: spark Updated Branches: refs/heads/master 20a601310 -> a95ed5215 [SPARK-5703] AllJobsPage throws empty.max exception If you have a `SparkListenerJobEnd` event without the corresponding `SparkListenerJobStart` event, then `JobProgressListener` will create an empty `JobUIData` with an empty `stageIds` list. However, later in `AllJobsPage` we call `stageIds.max`. If this is empty, it will throw an exception. This crashed my history server. Author: Andrew Or Closes #4490 from andrewor14/jobs-page-max and squashes the following commits: 21797d3 [Andrew Or] Check nonEmpty before calling max Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a95ed521 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a95ed521 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a95ed521 Branch: refs/heads/master Commit: a95ed52157473fb0e42e910ee15270e7f0edf943 Parents: 20a6013 Author: Andrew Or Authored: Mon Feb 9 21:18:48 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 21:18:48 2015 -0800 -- core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a95ed521/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 045c69d..bd923d7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -42,7 +42,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } def makeRow(job: JobUIData): Seq[Node] = { - val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max) + val lastStageInfo = Option(job.stageIds) +.filter(_.nonEmpty) +.flatMap { ids => listener.stageIdToInfo.get(ids.max) } val lastStageData = lastStageInfo.flatMap { s => listener.stageIdToData.get((s.stageId, s.attemptId)) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: [SPARK-2996] Implement userClassPathFirst for driver, yarn.
[SPARK-2996] Implement userClassPathFirst for driver, yarn. Yarn's config option `spark.yarn.user.classpath.first` does not work the same way as `spark.files.userClassPathFirst`; Yarn's version is a lot more dangerous, in that it modifies the system classpath, instead of restricting the changes to the user's class loader. So this change implements the behavior of the latter for Yarn, and deprecates the more dangerous choice. To be able to achieve feature-parity, I also implemented the option for drivers (the existing option only applies to executors). So now there are two options, each controlling whether to apply userClassPathFirst to the driver or executors. The old option was deprecated, and aliased to the new one (`spark.executor.userClassPathFirst`). The existing "child-first" class loader also had to be fixed. It didn't handle resources, and it was also doing some things that ended up causing JVM errors depending on how things were being called. Author: Marcelo Vanzin Closes #3233 from vanzin/SPARK-2996 and squashes the following commits: 9cf9cf1 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1499e2 [Marcelo Vanzin] Remove SPARK_HOME propagation. fa7df88 [Marcelo Vanzin] Remove 'test.resource' file, create it dynamically. a8c69f1 [Marcelo Vanzin] Review feedback. cabf962 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1b8d7e [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3f768e3 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 2ce3c7a [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6d6be [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 70d4044 [Marcelo Vanzin] Fix pyspark/yarn-cluster test. 0fe [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6ef19 [Marcelo Vanzin] Move class loaders around and make names more meaninful. fe970a7 [Marcelo Vanzin] Review feedback. 25d4fed [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3cb6498 [Marcelo Vanzin] Call the right loadClass() method on the parent. fbb8ab5 [Marcelo Vanzin] Add locking in loadClass() to avoid deadlocks. 2e6c4b7 [Marcelo Vanzin] Mention new setting in documentation. b6497f9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a10f379 [Marcelo Vanzin] Some feedback. 3730151 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 f513871 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 44010b6 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 7b57cba [Marcelo Vanzin] Remove now outdated message. 5304d64 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 35949c8 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 54e1a98 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 d1273b2 [Marcelo Vanzin] Add test file to rat exclude. fa1aafa [Marcelo Vanzin] Remove write check on user jars. 89d8072 [Marcelo Vanzin] Cleanups. a963ea3 [Marcelo Vanzin] Implement spark.driver.userClassPathFirst for standalone cluster mode. 50afa5f [Marcelo Vanzin] Fix Yarn executor command line. 7d14397 [Marcelo Vanzin] Register user jars in executor up front. 7f8603c [Marcelo Vanzin] Fix yarn-cluster mode without userClassPathFirst. 20373f5 [Marcelo Vanzin] Fix ClientBaseSuite. 55c88fa [Marcelo Vanzin] Run all Yarn integration tests via spark-submit. 0b64d92 [Marcelo Vanzin] Add deprecation warning to yarn option. 4a84d87 [Marcelo Vanzin] Fix the child-first class loader. d0394b8 [Marcelo Vanzin] Add "deprecated configs" to SparkConf. 46d8cf2 [Marcelo Vanzin] Update doc with new option, change name to "userClassPathFirst". a314f2d [Marcelo Vanzin] Enable driver class path isolation in SparkSubmit. 91f7e54 [Marcelo Vanzin] [yarn] Enable executor class path isolation. a853e74 [Marcelo Vanzin] Re-work CoarseGrainedExecutorBackend command line arguments. 89522ef [Marcelo Vanzin] Add class path isolation support for Yarn cluster mode. (cherry picked from commit 20a6013106b56a1a1cc3e8cda092330ffbe77cc3) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a1e0f96 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a1e0f96 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a1e0f96 Branch: refs/heads/branch-1.3 Commit: 6a1e0f967286945db13d94aeb6ed19f0a347c236 Parents: ebf1df0 Author: Marcelo Vanzin Authored: Mon Feb 9 21:17:06 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 21:17:35 2015 -0800 -- .../main/scala/org/apache/spark/SparkConf.scala | 83 +- .../main/scala/org/apache/spark/TestUtils.scala | 19 +- .../scala/org/apache/spark/deploy/Client.scala | 5 +- .../org/apache/spark/deploy/SparkSubmit.scala | 8 +- .../spark/deploy/master/ui/MasterPage.scala | 2 +- .../deploy/rest/StandaloneRestServer.scala | 2 +- .../spark/deploy/worker/DriverRunner.scala | 15 +- .../spark/deploy/worker/DriverWrapper.scala | 20 +- .../executor/CoarseGrainedExecutorBackend
[1/2] spark git commit: [SPARK-2996] Implement userClassPathFirst for driver, yarn.
Repository: spark Updated Branches: refs/heads/branch-1.3 ebf1df03d -> 6a1e0f967 http://git-wip-us.apache.org/repos/asf/spark/blob/6a1e0f96/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala -- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index e39de82..0e37276 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -17,27 +17,34 @@ package org.apache.spark.deploy.yarn -import java.io.File +import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.util.Properties import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ import scala.collection.mutable -import com.google.common.base.Charsets +import com.google.common.base.Charsets.UTF_8 +import com.google.common.io.ByteStreams import com.google.common.io.Files import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.util.Utils +/** + * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN + * applications, and require the Spark assembly to be built before they can be successfully + * run. + */ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging { - // log4j configuration for the Yarn containers, so that their output is collected - // by Yarn instead of trying to overwrite unit-tests.log. + // log4j configuration for the YARN containers, so that their output is collected + // by YARN instead of trying to overwrite unit-tests.log. private val LOG4J_CONF = """ |log4j.rootCategory=DEBUG, console |log4j.appender.console=org.apache.log4j.ConsoleAppender @@ -52,13 +59,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit | |from pyspark import SparkConf , SparkContext |if __name__ == "__main__": -|if len(sys.argv) != 3: -|print >> sys.stderr, "Usage: test.py [master] [result file]" +|if len(sys.argv) != 2: +|print >> sys.stderr, "Usage: test.py [result file]" |exit(-1) -|conf = SparkConf() -|conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode") -|sc = SparkContext(conf=conf) -|status = open(sys.argv[2],'w') +|sc = SparkContext(conf=SparkConf()) +|status = open(sys.argv[1],'w') |result = "failure" |rdd = sc.parallelize(range(10)) |cnt = rdd.count() @@ -72,23 +77,17 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit private var yarnCluster: MiniYARNCluster = _ private var tempDir: File = _ private var fakeSparkJar: File = _ - private var oldConf: Map[String, String] = _ + private var logConfDir: File = _ override def beforeAll() { super.beforeAll() tempDir = Utils.createTempDir() - -val logConfDir = new File(tempDir, "log4j") +logConfDir = new File(tempDir, "log4j") logConfDir.mkdir() val logConfFile = new File(logConfDir, "log4j.properties") -Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8) - -val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator + - sys.props("java.class.path") - -oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap +Files.write(LOG4J_CONF, logConfFile, UTF_8) yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1) yarnCluster.init(new YarnConfiguration()) @@ -119,99 +118,165 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit } logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}") -config.foreach { e => - sys.props += ("spark.hadoop." + e.getKey() -> e.getValue()) -} fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) -val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) -sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome) -sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome) -sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath())) -sys.props += ("spark.executor.instances" -> "1") -sys.props += ("spark.driver.extraClassPath" -> childClasspath) -sys.props += ("spark.executor.extraClassP
[2/2] spark git commit: [SPARK-2996] Implement userClassPathFirst for driver, yarn.
[SPARK-2996] Implement userClassPathFirst for driver, yarn. Yarn's config option `spark.yarn.user.classpath.first` does not work the same way as `spark.files.userClassPathFirst`; Yarn's version is a lot more dangerous, in that it modifies the system classpath, instead of restricting the changes to the user's class loader. So this change implements the behavior of the latter for Yarn, and deprecates the more dangerous choice. To be able to achieve feature-parity, I also implemented the option for drivers (the existing option only applies to executors). So now there are two options, each controlling whether to apply userClassPathFirst to the driver or executors. The old option was deprecated, and aliased to the new one (`spark.executor.userClassPathFirst`). The existing "child-first" class loader also had to be fixed. It didn't handle resources, and it was also doing some things that ended up causing JVM errors depending on how things were being called. Author: Marcelo Vanzin Closes #3233 from vanzin/SPARK-2996 and squashes the following commits: 9cf9cf1 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1499e2 [Marcelo Vanzin] Remove SPARK_HOME propagation. fa7df88 [Marcelo Vanzin] Remove 'test.resource' file, create it dynamically. a8c69f1 [Marcelo Vanzin] Review feedback. cabf962 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1b8d7e [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3f768e3 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 2ce3c7a [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6d6be [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 70d4044 [Marcelo Vanzin] Fix pyspark/yarn-cluster test. 0fe [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6ef19 [Marcelo Vanzin] Move class loaders around and make names more meaninful. fe970a7 [Marcelo Vanzin] Review feedback. 25d4fed [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3cb6498 [Marcelo Vanzin] Call the right loadClass() method on the parent. fbb8ab5 [Marcelo Vanzin] Add locking in loadClass() to avoid deadlocks. 2e6c4b7 [Marcelo Vanzin] Mention new setting in documentation. b6497f9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a10f379 [Marcelo Vanzin] Some feedback. 3730151 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 f513871 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 44010b6 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 7b57cba [Marcelo Vanzin] Remove now outdated message. 5304d64 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 35949c8 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 54e1a98 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 d1273b2 [Marcelo Vanzin] Add test file to rat exclude. fa1aafa [Marcelo Vanzin] Remove write check on user jars. 89d8072 [Marcelo Vanzin] Cleanups. a963ea3 [Marcelo Vanzin] Implement spark.driver.userClassPathFirst for standalone cluster mode. 50afa5f [Marcelo Vanzin] Fix Yarn executor command line. 7d14397 [Marcelo Vanzin] Register user jars in executor up front. 7f8603c [Marcelo Vanzin] Fix yarn-cluster mode without userClassPathFirst. 20373f5 [Marcelo Vanzin] Fix ClientBaseSuite. 55c88fa [Marcelo Vanzin] Run all Yarn integration tests via spark-submit. 0b64d92 [Marcelo Vanzin] Add deprecation warning to yarn option. 4a84d87 [Marcelo Vanzin] Fix the child-first class loader. d0394b8 [Marcelo Vanzin] Add "deprecated configs" to SparkConf. 46d8cf2 [Marcelo Vanzin] Update doc with new option, change name to "userClassPathFirst". a314f2d [Marcelo Vanzin] Enable driver class path isolation in SparkSubmit. 91f7e54 [Marcelo Vanzin] [yarn] Enable executor class path isolation. a853e74 [Marcelo Vanzin] Re-work CoarseGrainedExecutorBackend command line arguments. 89522ef [Marcelo Vanzin] Add class path isolation support for Yarn cluster mode. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20a60131 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20a60131 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20a60131 Branch: refs/heads/master Commit: 20a6013106b56a1a1cc3e8cda092330ffbe77cc3 Parents: 36c4e1d Author: Marcelo Vanzin Authored: Mon Feb 9 21:17:06 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 21:17:28 2015 -0800 -- .../main/scala/org/apache/spark/SparkConf.scala | 83 +- .../main/scala/org/apache/spark/TestUtils.scala | 19 +- .../scala/org/apache/spark/deploy/Client.scala | 5 +- .../org/apache/spark/deploy/SparkSubmit.scala | 8 +- .../spark/deploy/master/ui/MasterPage.scala | 2 +- .../deploy/rest/StandaloneRestServer.scala | 2 +- .../spark/deploy/worker/DriverRunner.scala | 15 +- .../spark/deploy/worker/DriverWrapper.scala | 20 +- .../executor/CoarseGrainedExecutorBackend.scala | 83 +- .../org/apache/spark/executor/Executor.scala| 52 ++-- .../spark/executor/
[1/2] spark git commit: [SPARK-2996] Implement userClassPathFirst for driver, yarn.
Repository: spark Updated Branches: refs/heads/master 36c4e1d75 -> 20a601310 http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala -- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index e39de82..0e37276 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -17,27 +17,34 @@ package org.apache.spark.deploy.yarn -import java.io.File +import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.util.Properties import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ import scala.collection.mutable -import com.google.common.base.Charsets +import com.google.common.base.Charsets.UTF_8 +import com.google.common.io.ByteStreams import com.google.common.io.Files import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.util.Utils +/** + * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN + * applications, and require the Spark assembly to be built before they can be successfully + * run. + */ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging { - // log4j configuration for the Yarn containers, so that their output is collected - // by Yarn instead of trying to overwrite unit-tests.log. + // log4j configuration for the YARN containers, so that their output is collected + // by YARN instead of trying to overwrite unit-tests.log. private val LOG4J_CONF = """ |log4j.rootCategory=DEBUG, console |log4j.appender.console=org.apache.log4j.ConsoleAppender @@ -52,13 +59,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit | |from pyspark import SparkConf , SparkContext |if __name__ == "__main__": -|if len(sys.argv) != 3: -|print >> sys.stderr, "Usage: test.py [master] [result file]" +|if len(sys.argv) != 2: +|print >> sys.stderr, "Usage: test.py [result file]" |exit(-1) -|conf = SparkConf() -|conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode") -|sc = SparkContext(conf=conf) -|status = open(sys.argv[2],'w') +|sc = SparkContext(conf=SparkConf()) +|status = open(sys.argv[1],'w') |result = "failure" |rdd = sc.parallelize(range(10)) |cnt = rdd.count() @@ -72,23 +77,17 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit private var yarnCluster: MiniYARNCluster = _ private var tempDir: File = _ private var fakeSparkJar: File = _ - private var oldConf: Map[String, String] = _ + private var logConfDir: File = _ override def beforeAll() { super.beforeAll() tempDir = Utils.createTempDir() - -val logConfDir = new File(tempDir, "log4j") +logConfDir = new File(tempDir, "log4j") logConfDir.mkdir() val logConfFile = new File(logConfDir, "log4j.properties") -Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8) - -val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator + - sys.props("java.class.path") - -oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap +Files.write(LOG4J_CONF, logConfFile, UTF_8) yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1) yarnCluster.init(new YarnConfiguration()) @@ -119,99 +118,165 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit } logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}") -config.foreach { e => - sys.props += ("spark.hadoop." + e.getKey() -> e.getValue()) -} fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) -val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) -sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome) -sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome) -sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath())) -sys.props += ("spark.executor.instances" -> "1") -sys.props += ("spark.driver.extraClassPath" -> childClasspath) -sys.props += ("spark.executor.extraClassPath"
spark git commit: SPARK-4900 [MLLIB] MLlib SingularValueDecomposition ARPACK IllegalStateException
Repository: spark Updated Branches: refs/heads/master 31d435ecf -> 36c4e1d75 SPARK-4900 [MLLIB] MLlib SingularValueDecomposition ARPACK IllegalStateException Fix ARPACK error code mapping, at least. It's not yet clear whether the error is what we expect from ARPACK. If it isn't, not sure if that's to be treated as an MLlib or Breeze issue. Author: Sean Owen Closes #4485 from srowen/SPARK-4900 and squashes the following commits: 7355aa1 [Sean Owen] Fix ARPACK error code mapping Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36c4e1d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36c4e1d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36c4e1d7 Branch: refs/heads/master Commit: 36c4e1d75933dc843acb747b91dc12e75ad1df42 Parents: 31d435e Author: Sean Owen Authored: Mon Feb 9 21:13:58 2015 -0800 Committer: Xiangrui Meng Committed: Mon Feb 9 21:13:58 2015 -0800 -- .../org/apache/spark/mllib/linalg/EigenValueDecomposition.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/36c4e1d7/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index 9d6f975..866936a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -117,7 +117,7 @@ private[mllib] object EigenValueDecomposition { info.`val` match { case 1 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + " Maximum number of iterations taken. (Refer ARPACK user guide for details)") -case 2 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + +case 3 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + " No shifts could be applied. Try to increase NCV. " + "(Refer ARPACK user guide for details)") case _ => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-4900 [MLLIB] MLlib SingularValueDecomposition ARPACK IllegalStateException
Repository: spark Updated Branches: refs/heads/branch-1.3 dad05e068 -> ebf1df03d SPARK-4900 [MLLIB] MLlib SingularValueDecomposition ARPACK IllegalStateException Fix ARPACK error code mapping, at least. It's not yet clear whether the error is what we expect from ARPACK. If it isn't, not sure if that's to be treated as an MLlib or Breeze issue. Author: Sean Owen Closes #4485 from srowen/SPARK-4900 and squashes the following commits: 7355aa1 [Sean Owen] Fix ARPACK error code mapping (cherry picked from commit 36c4e1d75933dc843acb747b91dc12e75ad1df42) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebf1df03 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebf1df03 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebf1df03 Branch: refs/heads/branch-1.3 Commit: ebf1df03d8ba447f92644f3b9c34d73ebbf4b020 Parents: dad05e0 Author: Sean Owen Authored: Mon Feb 9 21:13:58 2015 -0800 Committer: Xiangrui Meng Committed: Mon Feb 9 21:14:06 2015 -0800 -- .../org/apache/spark/mllib/linalg/EigenValueDecomposition.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ebf1df03/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index 3515461..623bdc5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -114,7 +114,7 @@ private[mllib] object EigenValueDecomposition { info.`val` match { case 1 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + " Maximum number of iterations taken. (Refer ARPACK user guide for details)") -case 2 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + +case 3 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + " No shifts could be applied. Try to increase NCV. " + "(Refer ARPACK user guide for details)") case _ => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Add a config option to print DAG.
Repository: spark Updated Branches: refs/heads/master 08488c175 -> 31d435ecf Add a config option to print DAG. Add a config option "spark.rddDebug.enable" to check whether to print DAG info. When "spark.rddDebug.enable" is true, it will print information about DAG in the log. Author: KaiXinXiaoLei Closes #4257 from KaiXinXiaoLei/DAGprint and squashes the following commits: d9fe42e [KaiXinXiaoLei] change log info c27ee76 [KaiXinXiaoLei] change log info 83c2b32 [KaiXinXiaoLei] change config option adcb14f [KaiXinXiaoLei] change the file. f4e7b9e [KaiXinXiaoLei] add a option to print DAG Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31d435ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31d435ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31d435ec Branch: refs/heads/master Commit: 31d435ecfdc24a788a6e38f4e82767bc275a3283 Parents: 08488c1 Author: KaiXinXiaoLei Authored: Mon Feb 9 20:58:58 2015 -0800 Committer: Reynold Xin Committed: Mon Feb 9 20:58:58 2015 -0800 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/31d435ec/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 71bdbc9..8d3c3d0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1420,6 +1420,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) +if (conf.getBoolean("spark.logLineage", false)) { + logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) +} dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Add a config option to print DAG.
Repository: spark Updated Branches: refs/heads/branch-1.3 f0562b423 -> dad05e068 Add a config option to print DAG. Add a config option "spark.rddDebug.enable" to check whether to print DAG info. When "spark.rddDebug.enable" is true, it will print information about DAG in the log. Author: KaiXinXiaoLei Closes #4257 from KaiXinXiaoLei/DAGprint and squashes the following commits: d9fe42e [KaiXinXiaoLei] change log info c27ee76 [KaiXinXiaoLei] change log info 83c2b32 [KaiXinXiaoLei] change config option adcb14f [KaiXinXiaoLei] change the file. f4e7b9e [KaiXinXiaoLei] add a option to print DAG (cherry picked from commit 31d435ecfdc24a788a6e38f4e82767bc275a3283) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dad05e06 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dad05e06 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dad05e06 Branch: refs/heads/branch-1.3 Commit: dad05e068670f3ec5a016366b62af4b21159ac01 Parents: f0562b4 Author: KaiXinXiaoLei Authored: Mon Feb 9 20:58:58 2015 -0800 Committer: Reynold Xin Committed: Mon Feb 9 20:59:05 2015 -0800 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dad05e06/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 71bdbc9..8d3c3d0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1420,6 +1420,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) +if (conf.getBoolean("spark.logLineage", false)) { + logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) +} dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[4/4] spark git commit: [SPARK-5469] restructure pyspark.sql into multiple files
[SPARK-5469] restructure pyspark.sql into multiple files All the DataTypes moved into pyspark.sql.types The changes can be tracked by `--find-copies-harder -M25` ``` davieslocalhost:~/work/spark/python$ git diff --find-copies-harder -M25 --numstat master.. 2 5 python/docs/pyspark.ml.rst 0 3 python/docs/pyspark.mllib.rst 10 2 python/docs/pyspark.sql.rst 1 1 python/pyspark/mllib/linalg.py 21 14 python/pyspark/{mllib => sql}/__init__.py 14 2108python/pyspark/{sql.py => sql/context.py} 10 1772python/pyspark/{sql.py => sql/dataframe.py} 7 6 python/pyspark/{sql_tests.py => sql/tests.py} 8 1465python/pyspark/{sql.py => sql/types.py} 4 2 python/run-tests 1 1 sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala ``` Also `git blame -C -C python/pyspark/sql/context.py` to track the history. Author: Davies Liu Closes #4479 from davies/sql and squashes the following commits: 1b5f0a5 [Davies Liu] Merge branch 'master' of github.com:apache/spark into sql 2b2b983 [Davies Liu] restructure pyspark.sql (cherry picked from commit 08488c175f2e8532cb6aab84da2abd9ad57179cc) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0562b42 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0562b42 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0562b42 Branch: refs/heads/branch-1.3 Commit: f0562b42383401d04dc10a3f7800906e5bc4c326 Parents: 62b1e1f Author: Davies Liu Authored: Mon Feb 9 20:49:22 2015 -0800 Committer: Reynold Xin Committed: Mon Feb 9 20:58:31 2015 -0800 -- python/docs/pyspark.ml.rst |7 +- python/docs/pyspark.mllib.rst |3 - python/docs/pyspark.sql.rst | 12 +- python/pyspark/mllib/linalg.py |2 +- python/pyspark/sql.py | 2736 -- python/pyspark/sql/__init__.py | 42 + python/pyspark/sql/context.py | 642 python/pyspark/sql/dataframe.py | 974 +++ python/pyspark/sql/tests.py | 300 ++ python/pyspark/sql/types.py | 1279 python/pyspark/sql_tests.py | 299 -- python/run-tests|6 +- .../apache/spark/sql/test/ExamplePointUDT.scala |2 +- 13 files changed, 3255 insertions(+), 3049 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f0562b42/python/docs/pyspark.ml.rst -- diff --git a/python/docs/pyspark.ml.rst b/python/docs/pyspark.ml.rst index f10d133..4da6d4a 100644 --- a/python/docs/pyspark.ml.rst +++ b/python/docs/pyspark.ml.rst @@ -1,11 +1,8 @@ pyspark.ml package = -Submodules --- - -pyspark.ml module -- +Module Context +-- .. automodule:: pyspark.ml :members: http://git-wip-us.apache.org/repos/asf/spark/blob/f0562b42/python/docs/pyspark.mllib.rst -- diff --git a/python/docs/pyspark.mllib.rst b/python/docs/pyspark.mllib.rst index 4548b87..21f66ca 100644 --- a/python/docs/pyspark.mllib.rst +++ b/python/docs/pyspark.mllib.rst @@ -1,9 +1,6 @@ pyspark.mllib package = -Submodules --- - pyspark.mllib.classification module --- http://git-wip-us.apache.org/repos/asf/spark/blob/f0562b42/python/docs/pyspark.sql.rst -- diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst index 65b3650..80c6f02 100644 --- a/python/docs/pyspark.sql.rst +++ b/python/docs/pyspark.sql.rst @@ -1,10 +1,18 @@ pyspark.sql module == -Module contents +Module Context +-- .. automodule:: pyspark.sql :members: :undoc-members: :show-inheritance: + + +pyspark.sql.types module + +.. automodule:: pyspark.sql.types +:members: +:undoc-members: +:show-inheritance: http://git-wip-us.apache.org/repos/asf/spark/blob/f0562b42/python/pyspark/mllib/linalg.py -- diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 7f21190..597012b 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -29,7 +29,7 @@ import copy_reg import numpy as np -from pyspark.sql import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \ +from pyspark.sql.types import UserDefinedType, StructFi
[1/4] spark git commit: [SPARK-5469] restructure pyspark.sql into multiple files
Repository: spark Updated Branches: refs/heads/master d302c4800 -> 08488c175 http://git-wip-us.apache.org/repos/asf/spark/blob/08488c17/python/pyspark/sql/types.py -- diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py new file mode 100644 index 000..41afefe --- /dev/null +++ b/python/pyspark/sql/types.py @@ -0,0 +1,1279 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import decimal +import datetime +import keyword +import warnings +import json +import re +from array import array +from operator import itemgetter + + +__all__ = [ +"DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType", +"TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType", +"LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType", ] + + +class DataType(object): + +"""Spark SQL DataType""" + +def __repr__(self): +return self.__class__.__name__ + +def __hash__(self): +return hash(str(self)) + +def __eq__(self, other): +return (isinstance(other, self.__class__) and +self.__dict__ == other.__dict__) + +def __ne__(self, other): +return not self.__eq__(other) + +@classmethod +def typeName(cls): +return cls.__name__[:-4].lower() + +def jsonValue(self): +return self.typeName() + +def json(self): +return json.dumps(self.jsonValue(), + separators=(',', ':'), + sort_keys=True) + + +class PrimitiveTypeSingleton(type): + +"""Metaclass for PrimitiveType""" + +_instances = {} + +def __call__(cls): +if cls not in cls._instances: +cls._instances[cls] = super(PrimitiveTypeSingleton, cls).__call__() +return cls._instances[cls] + + +class PrimitiveType(DataType): + +"""Spark SQL PrimitiveType""" + +__metaclass__ = PrimitiveTypeSingleton + +def __eq__(self, other): +# because they should be the same object +return self is other + + +class NullType(PrimitiveType): + +"""Spark SQL NullType + +The data type representing None, used for the types which has not +been inferred. +""" + + +class StringType(PrimitiveType): + +"""Spark SQL StringType + +The data type representing string values. +""" + + +class BinaryType(PrimitiveType): + +"""Spark SQL BinaryType + +The data type representing bytearray values. +""" + + +class BooleanType(PrimitiveType): + +"""Spark SQL BooleanType + +The data type representing bool values. +""" + + +class DateType(PrimitiveType): + +"""Spark SQL DateType + +The data type representing datetime.date values. +""" + + +class TimestampType(PrimitiveType): + +"""Spark SQL TimestampType + +The data type representing datetime.datetime values. +""" + + +class DecimalType(DataType): + +"""Spark SQL DecimalType + +The data type representing decimal.Decimal values. +""" + +def __init__(self, precision=None, scale=None): +self.precision = precision +self.scale = scale +self.hasPrecisionInfo = precision is not None + +def jsonValue(self): +if self.hasPrecisionInfo: +return "decimal(%d,%d)" % (self.precision, self.scale) +else: +return "decimal" + +def __repr__(self): +if self.hasPrecisionInfo: +return "DecimalType(%d,%d)" % (self.precision, self.scale) +else: +return "DecimalType()" + + +class DoubleType(PrimitiveType): + +"""Spark SQL DoubleType + +The data type representing float values. +""" + + +class FloatType(PrimitiveType): + +"""Spark SQL FloatType + +The data type representing single precision floating-point values. +""" + + +class ByteType(PrimitiveType): + +"""Spark SQL ByteType + +The data type representing int values with 1 singed byte. +""" + + +class IntegerType(PrimitiveType): + +"""Spark SQL IntegerType + +The data type representing int values. +""" + + +class LongType(PrimitiveType): + +"""Spark SQL LongType + +The data type represen
[3/4] spark git commit: [SPARK-5469] restructure pyspark.sql into multiple files
http://git-wip-us.apache.org/repos/asf/spark/blob/08488c17/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py deleted file mode 100644 index 6a6dfbc..000 --- a/python/pyspark/sql.py +++ /dev/null @@ -1,2736 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -public classes of Spark SQL: - -- L{SQLContext} - Main entry point for SQL functionality. -- L{DataFrame} - A Resilient Distributed Dataset (RDD) with Schema information for the data contained. In - addition to normal RDD operations, DataFrames also support SQL. -- L{GroupedData} -- L{Column} - Column is a DataFrame with a single column. -- L{Row} - A Row of data returned by a Spark SQL query. -- L{HiveContext} - Main entry point for accessing data stored in Apache Hive.. -""" - -import sys -import itertools -import decimal -import datetime -import keyword -import warnings -import json -import re -import random -import os -from tempfile import NamedTemporaryFile -from array import array -from operator import itemgetter -from itertools import imap - -from py4j.protocol import Py4JError -from py4j.java_collections import ListConverter, MapConverter - -from pyspark.context import SparkContext -from pyspark.rdd import RDD, _prepare_for_python_RDD -from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \ -CloudPickleSerializer, UTF8Deserializer -from pyspark.storagelevel import StorageLevel -from pyspark.traceback_utils import SCCallSiteSync - - -__all__ = [ -"StringType", "BinaryType", "BooleanType", "DateType", "TimestampType", "DecimalType", -"DoubleType", "FloatType", "ByteType", "IntegerType", "LongType", -"ShortType", "ArrayType", "MapType", "StructField", "StructType", -"SQLContext", "HiveContext", "DataFrame", "GroupedData", "Column", "Row", "Dsl", -"SchemaRDD"] - - -class DataType(object): - -"""Spark SQL DataType""" - -def __repr__(self): -return self.__class__.__name__ - -def __hash__(self): -return hash(str(self)) - -def __eq__(self, other): -return (isinstance(other, self.__class__) and -self.__dict__ == other.__dict__) - -def __ne__(self, other): -return not self.__eq__(other) - -@classmethod -def typeName(cls): -return cls.__name__[:-4].lower() - -def jsonValue(self): -return self.typeName() - -def json(self): -return json.dumps(self.jsonValue(), - separators=(',', ':'), - sort_keys=True) - - -class PrimitiveTypeSingleton(type): - -"""Metaclass for PrimitiveType""" - -_instances = {} - -def __call__(cls): -if cls not in cls._instances: -cls._instances[cls] = super(PrimitiveTypeSingleton, cls).__call__() -return cls._instances[cls] - - -class PrimitiveType(DataType): - -"""Spark SQL PrimitiveType""" - -__metaclass__ = PrimitiveTypeSingleton - -def __eq__(self, other): -# because they should be the same object -return self is other - - -class NullType(PrimitiveType): - -"""Spark SQL NullType - -The data type representing None, used for the types which has not -been inferred. -""" - - -class StringType(PrimitiveType): - -"""Spark SQL StringType - -The data type representing string values. -""" - - -class BinaryType(PrimitiveType): - -"""Spark SQL BinaryType - -The data type representing bytearray values. -""" - - -class BooleanType(PrimitiveType): - -"""Spark SQL BooleanType - -The data type representing bool values. -""" - - -class DateType(PrimitiveType): - -"""Spark SQL DateType - -The data type representing datetime.date values. -""" - - -class TimestampType(PrimitiveType): - -"""Spark SQL TimestampType - -The data type representing datetime.datetime values. -""" - - -class DecimalType(DataType): - -"""Spark SQL DecimalType - -The data type representing decimal.Decimal values. -""" - -def __init__(self, precision=None, scale=None): -self.precision = pre
[3/4] spark git commit: [SPARK-5469] restructure pyspark.sql into multiple files
http://git-wip-us.apache.org/repos/asf/spark/blob/f0562b42/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py deleted file mode 100644 index 6a6dfbc..000 --- a/python/pyspark/sql.py +++ /dev/null @@ -1,2736 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -public classes of Spark SQL: - -- L{SQLContext} - Main entry point for SQL functionality. -- L{DataFrame} - A Resilient Distributed Dataset (RDD) with Schema information for the data contained. In - addition to normal RDD operations, DataFrames also support SQL. -- L{GroupedData} -- L{Column} - Column is a DataFrame with a single column. -- L{Row} - A Row of data returned by a Spark SQL query. -- L{HiveContext} - Main entry point for accessing data stored in Apache Hive.. -""" - -import sys -import itertools -import decimal -import datetime -import keyword -import warnings -import json -import re -import random -import os -from tempfile import NamedTemporaryFile -from array import array -from operator import itemgetter -from itertools import imap - -from py4j.protocol import Py4JError -from py4j.java_collections import ListConverter, MapConverter - -from pyspark.context import SparkContext -from pyspark.rdd import RDD, _prepare_for_python_RDD -from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \ -CloudPickleSerializer, UTF8Deserializer -from pyspark.storagelevel import StorageLevel -from pyspark.traceback_utils import SCCallSiteSync - - -__all__ = [ -"StringType", "BinaryType", "BooleanType", "DateType", "TimestampType", "DecimalType", -"DoubleType", "FloatType", "ByteType", "IntegerType", "LongType", -"ShortType", "ArrayType", "MapType", "StructField", "StructType", -"SQLContext", "HiveContext", "DataFrame", "GroupedData", "Column", "Row", "Dsl", -"SchemaRDD"] - - -class DataType(object): - -"""Spark SQL DataType""" - -def __repr__(self): -return self.__class__.__name__ - -def __hash__(self): -return hash(str(self)) - -def __eq__(self, other): -return (isinstance(other, self.__class__) and -self.__dict__ == other.__dict__) - -def __ne__(self, other): -return not self.__eq__(other) - -@classmethod -def typeName(cls): -return cls.__name__[:-4].lower() - -def jsonValue(self): -return self.typeName() - -def json(self): -return json.dumps(self.jsonValue(), - separators=(',', ':'), - sort_keys=True) - - -class PrimitiveTypeSingleton(type): - -"""Metaclass for PrimitiveType""" - -_instances = {} - -def __call__(cls): -if cls not in cls._instances: -cls._instances[cls] = super(PrimitiveTypeSingleton, cls).__call__() -return cls._instances[cls] - - -class PrimitiveType(DataType): - -"""Spark SQL PrimitiveType""" - -__metaclass__ = PrimitiveTypeSingleton - -def __eq__(self, other): -# because they should be the same object -return self is other - - -class NullType(PrimitiveType): - -"""Spark SQL NullType - -The data type representing None, used for the types which has not -been inferred. -""" - - -class StringType(PrimitiveType): - -"""Spark SQL StringType - -The data type representing string values. -""" - - -class BinaryType(PrimitiveType): - -"""Spark SQL BinaryType - -The data type representing bytearray values. -""" - - -class BooleanType(PrimitiveType): - -"""Spark SQL BooleanType - -The data type representing bool values. -""" - - -class DateType(PrimitiveType): - -"""Spark SQL DateType - -The data type representing datetime.date values. -""" - - -class TimestampType(PrimitiveType): - -"""Spark SQL TimestampType - -The data type representing datetime.datetime values. -""" - - -class DecimalType(DataType): - -"""Spark SQL DecimalType - -The data type representing decimal.Decimal values. -""" - -def __init__(self, precision=None, scale=None): -self.precision = pre
[2/4] spark git commit: [SPARK-5469] restructure pyspark.sql into multiple files
http://git-wip-us.apache.org/repos/asf/spark/blob/f0562b42/python/pyspark/sql/__init__.py -- diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py new file mode 100644 index 000..0a5ba00 --- /dev/null +++ b/python/pyspark/sql/__init__.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +public classes of Spark SQL: + +- L{SQLContext} + Main entry point for SQL functionality. +- L{DataFrame} + A Resilient Distributed Dataset (RDD) with Schema information for the data contained. In + addition to normal RDD operations, DataFrames also support SQL. +- L{GroupedData} +- L{Column} + Column is a DataFrame with a single column. +- L{Row} + A Row of data returned by a Spark SQL query. +- L{HiveContext} + Main entry point for accessing data stored in Apache Hive.. +""" + +from pyspark.sql.context import SQLContext, HiveContext +from pyspark.sql.types import Row +from pyspark.sql.dataframe import DataFrame, GroupedData, Column, Dsl, SchemaRDD + +__all__ = [ +'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row', +'Dsl', +] http://git-wip-us.apache.org/repos/asf/spark/blob/f0562b42/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py new file mode 100644 index 000..49f016a --- /dev/null +++ b/python/pyspark/sql/context.py @@ -0,0 +1,642 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import warnings +import json +from array import array +from itertools import imap + +from py4j.protocol import Py4JError + +from pyspark.rdd import _prepare_for_python_RDD +from pyspark.serializers import AutoBatchedSerializer, PickleSerializer +from pyspark.sql.types import StringType, StructType, _verify_type, \ +_infer_schema, _has_nulltype, _merge_type, _create_converter, _python_to_sql_converter +from pyspark.sql.dataframe import DataFrame + +__all__ = ["SQLContext", "HiveContext"] + + +class SQLContext(object): + +"""Main entry point for Spark SQL functionality. + +A SQLContext can be used create L{DataFrame}, register L{DataFrame} as +tables, execute SQL over tables, cache tables, and read parquet files. +""" + +def __init__(self, sparkContext, sqlContext=None): +"""Create a new SQLContext. + +:param sparkContext: The SparkContext to wrap. +:param sqlContext: An optional JVM Scala SQLContext. If set, we do not instatiate a new +SQLContext in the JVM, instead we make all calls to this object. + +>>> df = sqlCtx.inferSchema(rdd) +>>> sqlCtx.inferSchema(df) # doctest: +IGNORE_EXCEPTION_DETAIL +Traceback (most recent call last): +... +TypeError:... + +>>> bad_rdd = sc.parallelize([1,2,3]) +>>> sqlCtx.inferSchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL +Traceback (most recent call last): +... +ValueError:... + +>>> from datetime import datetime +>>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1L, +... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1), +... time=datetime(2014, 8, 1, 14, 1, 5))]) +>>> df = sqlCtx.inferSchema(allTypes) +>>> df.registerTempTable("allTypes") +>>> sqlCtx.sql('select i+1, d+1, not
[1/4] spark git commit: [SPARK-5469] restructure pyspark.sql into multiple files
Repository: spark Updated Branches: refs/heads/branch-1.3 62b1e1fc0 -> f0562b423 http://git-wip-us.apache.org/repos/asf/spark/blob/f0562b42/python/pyspark/sql/types.py -- diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py new file mode 100644 index 000..41afefe --- /dev/null +++ b/python/pyspark/sql/types.py @@ -0,0 +1,1279 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import decimal +import datetime +import keyword +import warnings +import json +import re +from array import array +from operator import itemgetter + + +__all__ = [ +"DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType", +"TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType", +"LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType", ] + + +class DataType(object): + +"""Spark SQL DataType""" + +def __repr__(self): +return self.__class__.__name__ + +def __hash__(self): +return hash(str(self)) + +def __eq__(self, other): +return (isinstance(other, self.__class__) and +self.__dict__ == other.__dict__) + +def __ne__(self, other): +return not self.__eq__(other) + +@classmethod +def typeName(cls): +return cls.__name__[:-4].lower() + +def jsonValue(self): +return self.typeName() + +def json(self): +return json.dumps(self.jsonValue(), + separators=(',', ':'), + sort_keys=True) + + +class PrimitiveTypeSingleton(type): + +"""Metaclass for PrimitiveType""" + +_instances = {} + +def __call__(cls): +if cls not in cls._instances: +cls._instances[cls] = super(PrimitiveTypeSingleton, cls).__call__() +return cls._instances[cls] + + +class PrimitiveType(DataType): + +"""Spark SQL PrimitiveType""" + +__metaclass__ = PrimitiveTypeSingleton + +def __eq__(self, other): +# because they should be the same object +return self is other + + +class NullType(PrimitiveType): + +"""Spark SQL NullType + +The data type representing None, used for the types which has not +been inferred. +""" + + +class StringType(PrimitiveType): + +"""Spark SQL StringType + +The data type representing string values. +""" + + +class BinaryType(PrimitiveType): + +"""Spark SQL BinaryType + +The data type representing bytearray values. +""" + + +class BooleanType(PrimitiveType): + +"""Spark SQL BooleanType + +The data type representing bool values. +""" + + +class DateType(PrimitiveType): + +"""Spark SQL DateType + +The data type representing datetime.date values. +""" + + +class TimestampType(PrimitiveType): + +"""Spark SQL TimestampType + +The data type representing datetime.datetime values. +""" + + +class DecimalType(DataType): + +"""Spark SQL DecimalType + +The data type representing decimal.Decimal values. +""" + +def __init__(self, precision=None, scale=None): +self.precision = precision +self.scale = scale +self.hasPrecisionInfo = precision is not None + +def jsonValue(self): +if self.hasPrecisionInfo: +return "decimal(%d,%d)" % (self.precision, self.scale) +else: +return "decimal" + +def __repr__(self): +if self.hasPrecisionInfo: +return "DecimalType(%d,%d)" % (self.precision, self.scale) +else: +return "DecimalType()" + + +class DoubleType(PrimitiveType): + +"""Spark SQL DoubleType + +The data type representing float values. +""" + + +class FloatType(PrimitiveType): + +"""Spark SQL FloatType + +The data type representing single precision floating-point values. +""" + + +class ByteType(PrimitiveType): + +"""Spark SQL ByteType + +The data type representing int values with 1 singed byte. +""" + + +class IntegerType(PrimitiveType): + +"""Spark SQL IntegerType + +The data type representing int values. +""" + + +class LongType(PrimitiveType): + +"""Spark SQL LongType + +The data type repr
[2/4] spark git commit: [SPARK-5469] restructure pyspark.sql into multiple files
http://git-wip-us.apache.org/repos/asf/spark/blob/08488c17/python/pyspark/sql/__init__.py -- diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py new file mode 100644 index 000..0a5ba00 --- /dev/null +++ b/python/pyspark/sql/__init__.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +public classes of Spark SQL: + +- L{SQLContext} + Main entry point for SQL functionality. +- L{DataFrame} + A Resilient Distributed Dataset (RDD) with Schema information for the data contained. In + addition to normal RDD operations, DataFrames also support SQL. +- L{GroupedData} +- L{Column} + Column is a DataFrame with a single column. +- L{Row} + A Row of data returned by a Spark SQL query. +- L{HiveContext} + Main entry point for accessing data stored in Apache Hive.. +""" + +from pyspark.sql.context import SQLContext, HiveContext +from pyspark.sql.types import Row +from pyspark.sql.dataframe import DataFrame, GroupedData, Column, Dsl, SchemaRDD + +__all__ = [ +'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row', +'Dsl', +] http://git-wip-us.apache.org/repos/asf/spark/blob/08488c17/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py new file mode 100644 index 000..49f016a --- /dev/null +++ b/python/pyspark/sql/context.py @@ -0,0 +1,642 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import warnings +import json +from array import array +from itertools import imap + +from py4j.protocol import Py4JError + +from pyspark.rdd import _prepare_for_python_RDD +from pyspark.serializers import AutoBatchedSerializer, PickleSerializer +from pyspark.sql.types import StringType, StructType, _verify_type, \ +_infer_schema, _has_nulltype, _merge_type, _create_converter, _python_to_sql_converter +from pyspark.sql.dataframe import DataFrame + +__all__ = ["SQLContext", "HiveContext"] + + +class SQLContext(object): + +"""Main entry point for Spark SQL functionality. + +A SQLContext can be used create L{DataFrame}, register L{DataFrame} as +tables, execute SQL over tables, cache tables, and read parquet files. +""" + +def __init__(self, sparkContext, sqlContext=None): +"""Create a new SQLContext. + +:param sparkContext: The SparkContext to wrap. +:param sqlContext: An optional JVM Scala SQLContext. If set, we do not instatiate a new +SQLContext in the JVM, instead we make all calls to this object. + +>>> df = sqlCtx.inferSchema(rdd) +>>> sqlCtx.inferSchema(df) # doctest: +IGNORE_EXCEPTION_DETAIL +Traceback (most recent call last): +... +TypeError:... + +>>> bad_rdd = sc.parallelize([1,2,3]) +>>> sqlCtx.inferSchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL +Traceback (most recent call last): +... +ValueError:... + +>>> from datetime import datetime +>>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1L, +... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1), +... time=datetime(2014, 8, 1, 14, 1, 5))]) +>>> df = sqlCtx.inferSchema(allTypes) +>>> df.registerTempTable("allTypes") +>>> sqlCtx.sql('select i+1, d+1, not
[4/4] spark git commit: [SPARK-5469] restructure pyspark.sql into multiple files
[SPARK-5469] restructure pyspark.sql into multiple files All the DataTypes moved into pyspark.sql.types The changes can be tracked by `--find-copies-harder -M25` ``` davieslocalhost:~/work/spark/python$ git diff --find-copies-harder -M25 --numstat master.. 2 5 python/docs/pyspark.ml.rst 0 3 python/docs/pyspark.mllib.rst 10 2 python/docs/pyspark.sql.rst 1 1 python/pyspark/mllib/linalg.py 21 14 python/pyspark/{mllib => sql}/__init__.py 14 2108python/pyspark/{sql.py => sql/context.py} 10 1772python/pyspark/{sql.py => sql/dataframe.py} 7 6 python/pyspark/{sql_tests.py => sql/tests.py} 8 1465python/pyspark/{sql.py => sql/types.py} 4 2 python/run-tests 1 1 sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala ``` Also `git blame -C -C python/pyspark/sql/context.py` to track the history. Author: Davies Liu Closes #4479 from davies/sql and squashes the following commits: 1b5f0a5 [Davies Liu] Merge branch 'master' of github.com:apache/spark into sql 2b2b983 [Davies Liu] restructure pyspark.sql Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08488c17 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08488c17 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08488c17 Branch: refs/heads/master Commit: 08488c175f2e8532cb6aab84da2abd9ad57179cc Parents: d302c48 Author: Davies Liu Authored: Mon Feb 9 20:49:22 2015 -0800 Committer: Reynold Xin Committed: Mon Feb 9 20:49:22 2015 -0800 -- python/docs/pyspark.ml.rst |7 +- python/docs/pyspark.mllib.rst |3 - python/docs/pyspark.sql.rst | 12 +- python/pyspark/mllib/linalg.py |2 +- python/pyspark/sql.py | 2736 -- python/pyspark/sql/__init__.py | 42 + python/pyspark/sql/context.py | 642 python/pyspark/sql/dataframe.py | 974 +++ python/pyspark/sql/tests.py | 300 ++ python/pyspark/sql/types.py | 1279 python/pyspark/sql_tests.py | 299 -- python/run-tests|6 +- .../apache/spark/sql/test/ExamplePointUDT.scala |2 +- 13 files changed, 3255 insertions(+), 3049 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/08488c17/python/docs/pyspark.ml.rst -- diff --git a/python/docs/pyspark.ml.rst b/python/docs/pyspark.ml.rst index f10d133..4da6d4a 100644 --- a/python/docs/pyspark.ml.rst +++ b/python/docs/pyspark.ml.rst @@ -1,11 +1,8 @@ pyspark.ml package = -Submodules --- - -pyspark.ml module -- +Module Context +-- .. automodule:: pyspark.ml :members: http://git-wip-us.apache.org/repos/asf/spark/blob/08488c17/python/docs/pyspark.mllib.rst -- diff --git a/python/docs/pyspark.mllib.rst b/python/docs/pyspark.mllib.rst index 4548b87..21f66ca 100644 --- a/python/docs/pyspark.mllib.rst +++ b/python/docs/pyspark.mllib.rst @@ -1,9 +1,6 @@ pyspark.mllib package = -Submodules --- - pyspark.mllib.classification module --- http://git-wip-us.apache.org/repos/asf/spark/blob/08488c17/python/docs/pyspark.sql.rst -- diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst index 65b3650..80c6f02 100644 --- a/python/docs/pyspark.sql.rst +++ b/python/docs/pyspark.sql.rst @@ -1,10 +1,18 @@ pyspark.sql module == -Module contents +Module Context +-- .. automodule:: pyspark.sql :members: :undoc-members: :show-inheritance: + + +pyspark.sql.types module + +.. automodule:: pyspark.sql.types +:members: +:undoc-members: +:show-inheritance: http://git-wip-us.apache.org/repos/asf/spark/blob/08488c17/python/pyspark/mllib/linalg.py -- diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 7f21190..597012b 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -29,7 +29,7 @@ import copy_reg import numpy as np -from pyspark.sql import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \ +from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \ IntegerType, ByteType --
spark git commit: [SPARK-5698] Do not let user request negative # of executors
Repository: spark Updated Branches: refs/heads/branch-1.3 71f0f5115 -> 62b1e1fc0 [SPARK-5698] Do not let user request negative # of executors Otherwise we might crash the ApplicationMaster. Why? Please see https://issues.apache.org/jira/browse/SPARK-5698. sryza I believe this is also relevant in your patch #4168. Author: Andrew Or Closes #4483 from andrewor14/da-negative and squashes the following commits: 53ed955 [Andrew Or] Throw IllegalArgumentException instead 0e89fd5 [Andrew Or] Check against negative requests (cherry picked from commit d302c4800bf2f74eceb731169ddf1766136b7398) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62b1e1fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62b1e1fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62b1e1fc Branch: refs/heads/branch-1.3 Commit: 62b1e1fc0815329580e774791bc7aadcc6f50fb1 Parents: 71f0f51 Author: Andrew Or Authored: Mon Feb 9 17:33:29 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 17:33:34 2015 -0800 -- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/62b1e1fc/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9d2fb4f..f9ca934 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -314,6 +314,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste * Return whether the request is acknowledged. */ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized { +if (numAdditionalExecutors < 0) { + throw new IllegalArgumentException( +"Attempted to request a negative number of additional executor(s) " + +s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!") +} logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") logDebug(s"Number of pending executors is now $numPendingExecutors") numPendingExecutors += numAdditionalExecutors - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5698] Do not let user request negative # of executors
Repository: spark Updated Branches: refs/heads/branch-1.2 63eee523e -> 515f65804 [SPARK-5698] Do not let user request negative # of executors Otherwise we might crash the ApplicationMaster. Why? Please see https://issues.apache.org/jira/browse/SPARK-5698. sryza I believe this is also relevant in your patch #4168. Author: Andrew Or Closes #4483 from andrewor14/da-negative and squashes the following commits: 53ed955 [Andrew Or] Throw IllegalArgumentException instead 0e89fd5 [Andrew Or] Check against negative requests Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/515f6580 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/515f6580 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/515f6580 Branch: refs/heads/branch-1.2 Commit: 515f65804e2502b570ca8955cb82971056624cfe Parents: 63eee52 Author: Andrew Or Authored: Mon Feb 9 17:33:29 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 17:34:02 2015 -0800 -- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/515f6580/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index fe9914b..e4f504b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -308,6 +308,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste * Return whether the request is acknowledged. */ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized { +if (numAdditionalExecutors < 0) { + throw new IllegalArgumentException( +"Attempted to request a negative number of additional executor(s) " + +s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!") +} logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") logDebug(s"Number of pending executors is now $numPendingExecutors") numPendingExecutors += numAdditionalExecutors - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5698] Do not let user request negative # of executors
Repository: spark Updated Branches: refs/heads/master 3ec3ad295 -> d302c4800 [SPARK-5698] Do not let user request negative # of executors Otherwise we might crash the ApplicationMaster. Why? Please see https://issues.apache.org/jira/browse/SPARK-5698. sryza I believe this is also relevant in your patch #4168. Author: Andrew Or Closes #4483 from andrewor14/da-negative and squashes the following commits: 53ed955 [Andrew Or] Throw IllegalArgumentException instead 0e89fd5 [Andrew Or] Check against negative requests Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d302c480 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d302c480 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d302c480 Branch: refs/heads/master Commit: d302c4800bf2f74eceb731169ddf1766136b7398 Parents: 3ec3ad2 Author: Andrew Or Authored: Mon Feb 9 17:33:29 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 17:33:29 2015 -0800 -- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d302c480/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9d2fb4f..f9ca934 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -314,6 +314,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste * Return whether the request is acknowledged. */ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized { +if (numAdditionalExecutors < 0) { + throw new IllegalArgumentException( +"Attempted to request a negative number of additional executor(s) " + +s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!") +} logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") logDebug(s"Number of pending executors is now $numPendingExecutors") numPendingExecutors += numAdditionalExecutors - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5699] [SQL] [Tests] Runs hive-thriftserver tests whenever SQL code is modified
Repository: spark Updated Branches: refs/heads/branch-1.3 e2bf59af1 -> 71f0f5115 [SPARK-5699] [SQL] [Tests] Runs hive-thriftserver tests whenever SQL code is modified [https://reviewable.io/review_button.png"; height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4486) Author: Cheng Lian Closes #4486 from liancheng/spark-5699 and squashes the following commits: 538001d [Cheng Lian] Runs hive-thriftserver tests whenever SQL code is modified (cherry picked from commit 3ec3ad295ddd1435da68251b7479ffb60aec7037) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71f0f511 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71f0f511 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71f0f511 Branch: refs/heads/branch-1.3 Commit: 71f0f5115c95842efefdacb6ec5ca837ae628148 Parents: e2bf59a Author: Cheng Lian Authored: Mon Feb 9 16:52:05 2015 -0800 Committer: Cheng Lian Committed: Mon Feb 9 16:52:20 2015 -0800 -- dev/run-tests | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/71f0f511/dev/run-tests -- diff --git a/dev/run-tests b/dev/run-tests index 2257a56..4839587 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -36,7 +36,7 @@ function handle_error () { } -# Build against the right verison of Hadoop. +# Build against the right version of Hadoop. { if [ -n "$AMPLAB_JENKINS_BUILD_PROFILE" ]; then if [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop1.0" ]; then @@ -77,7 +77,7 @@ export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl" fi } -# Only run Hive tests if there are sql changes. +# Only run Hive tests if there are SQL changes. # Partial solution for SPARK-1455. if [ -n "$AMPLAB_JENKINS" ]; then git fetch origin master:master @@ -183,7 +183,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS if [ -n "$_SQL_TESTS_ONLY" ]; then # This must be an array of individual arguments. Otherwise, having one long string # will be interpreted as a single test, which doesn't work. -SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "mllib/test") +SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "hive-thriftserver/test" "mllib/test") else SBT_MAVEN_TEST_ARGS=("test") fi - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5699] [SQL] [Tests] Runs hive-thriftserver tests whenever SQL code is modified
Repository: spark Updated Branches: refs/heads/master d08e7c2b4 -> 3ec3ad295 [SPARK-5699] [SQL] [Tests] Runs hive-thriftserver tests whenever SQL code is modified [https://reviewable.io/review_button.png"; height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4486) Author: Cheng Lian Closes #4486 from liancheng/spark-5699 and squashes the following commits: 538001d [Cheng Lian] Runs hive-thriftserver tests whenever SQL code is modified Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ec3ad29 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ec3ad29 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ec3ad29 Branch: refs/heads/master Commit: 3ec3ad295ddd1435da68251b7479ffb60aec7037 Parents: d08e7c2 Author: Cheng Lian Authored: Mon Feb 9 16:52:05 2015 -0800 Committer: Cheng Lian Committed: Mon Feb 9 16:52:05 2015 -0800 -- dev/run-tests | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3ec3ad29/dev/run-tests -- diff --git a/dev/run-tests b/dev/run-tests index 2257a56..4839587 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -36,7 +36,7 @@ function handle_error () { } -# Build against the right verison of Hadoop. +# Build against the right version of Hadoop. { if [ -n "$AMPLAB_JENKINS_BUILD_PROFILE" ]; then if [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop1.0" ]; then @@ -77,7 +77,7 @@ export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl" fi } -# Only run Hive tests if there are sql changes. +# Only run Hive tests if there are SQL changes. # Partial solution for SPARK-1455. if [ -n "$AMPLAB_JENKINS" ]; then git fetch origin master:master @@ -183,7 +183,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS if [ -n "$_SQL_TESTS_ONLY" ]; then # This must be an array of individual arguments. Otherwise, having one long string # will be interpreted as a single test, which doesn't work. -SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "mllib/test") +SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "hive-thriftserver/test" "mllib/test") else SBT_MAVEN_TEST_ARGS=("test") fi - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5648][SQL] support "alter ... unset tblproperties("key")"
Repository: spark Updated Branches: refs/heads/branch-1.3 15f557fd9 -> e2bf59af1 [SPARK-5648][SQL] support "alter ... unset tblproperties("key")" make hivecontext support "alter ... unset tblproperties("key")" like : alter view viewName unset tblproperties("k") alter table tableName unset tblproperties("k") Author: DoingDone9 <799203...@qq.com> Closes #4424 from DoingDone9/unset and squashes the following commits: 6dd8bee [DoingDone9] support "alter ... unset tblproperties("key")" (cherry picked from commit d08e7c2b498584609cb3c7922eaaa2a0d115603f) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2bf59af Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2bf59af Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2bf59af Branch: refs/heads/branch-1.3 Commit: e2bf59af1aea7c59114da74bf6739d14f3ad9c60 Parents: 15f557f Author: DoingDone9 <799203...@qq.com> Authored: Mon Feb 9 16:40:26 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 16:40:42 2015 -0800 -- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e2bf59af/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 2a4b880..f51af62 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -103,6 +103,7 @@ private[hive] object HiveQl { "TOK_CREATEINDEX", "TOK_DROPDATABASE", "TOK_DROPINDEX", +"TOK_DROPTABLE_PROPERTIES", "TOK_MSCK", "TOK_ALTERVIEW_ADDPARTS", @@ -111,6 +112,7 @@ private[hive] object HiveQl { "TOK_ALTERVIEW_PROPERTIES", "TOK_ALTERVIEW_RENAME", "TOK_CREATEVIEW", +"TOK_DROPVIEW_PROPERTIES", "TOK_DROPVIEW", "TOK_EXPORT", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5648][SQL] support "alter ... unset tblproperties("key")"
Repository: spark Updated Branches: refs/heads/master 0ee53ebce -> d08e7c2b4 [SPARK-5648][SQL] support "alter ... unset tblproperties("key")" make hivecontext support "alter ... unset tblproperties("key")" like : alter view viewName unset tblproperties("k") alter table tableName unset tblproperties("k") Author: DoingDone9 <799203...@qq.com> Closes #4424 from DoingDone9/unset and squashes the following commits: 6dd8bee [DoingDone9] support "alter ... unset tblproperties("key")" Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d08e7c2b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d08e7c2b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d08e7c2b Branch: refs/heads/master Commit: d08e7c2b498584609cb3c7922eaaa2a0d115603f Parents: 0ee53eb Author: DoingDone9 <799203...@qq.com> Authored: Mon Feb 9 16:40:26 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 16:40:26 2015 -0800 -- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d08e7c2b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 2a4b880..f51af62 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -103,6 +103,7 @@ private[hive] object HiveQl { "TOK_CREATEINDEX", "TOK_DROPDATABASE", "TOK_DROPINDEX", +"TOK_DROPTABLE_PROPERTIES", "TOK_MSCK", "TOK_ALTERVIEW_ADDPARTS", @@ -111,6 +112,7 @@ private[hive] object HiveQl { "TOK_ALTERVIEW_PROPERTIES", "TOK_ALTERVIEW_RENAME", "TOK_CREATEVIEW", +"TOK_DROPVIEW_PROPERTIES", "TOK_DROPVIEW", "TOK_EXPORT", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-2096][SQL] support dot notation on array of struct
Repository: spark Updated Branches: refs/heads/branch-1.3 ce2c89cfb -> 15f557fd9 [SPARK-2096][SQL] support dot notation on array of struct ~~The rule is simple: If you want `a.b` work, then `a` must be some level of nested array of struct(level 0 means just a StructType). And the result of `a.b` is same level of nested array of b-type. An optimization is: the resolve chain looks like `Attribute -> GetItem -> GetField -> GetField ...`, so we could transmit the nested array information between `GetItem` and `GetField` to avoid repeated computation of `innerDataType` and `containsNullList` of that nested array.~~ marmbrus Could you take a look? to evaluate `a.b`, if `a` is array of struct, then `a.b` means get field `b` on each element of `a`, and return a result of array. Author: Wenchen Fan Closes #2405 from cloud-fan/nested-array-dot and squashes the following commits: 08a228a [Wenchen Fan] support dot notation on array of struct (cherry picked from commit 0ee53ebce9944722e76b2b28fae79d9956be9f17) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15f557fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15f557fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15f557fd Branch: refs/heads/branch-1.3 Commit: 15f557fd9da947307d64bd7f4676d4d98684fd4a Parents: ce2c89c Author: Wenchen Fan Authored: Mon Feb 9 16:39:34 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 16:39:45 2015 -0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 30 ++--- .../sql/catalyst/expressions/complexTypes.scala | 34 +--- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../expressions/ExpressionEvaluationSuite.scala | 2 +- .../org/apache/spark/sql/json/JsonSuite.scala | 6 ++-- 5 files changed, 53 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/15f557fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0b59ed1..fb2ff01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -22,8 +22,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{ArrayType, StructField, StructType, IntegerType} /** * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing @@ -311,18 +310,25 @@ class Analyzer(catalog: Catalog, * desired fields are found. */ protected def resolveGetField(expr: Expression, fieldName: String): Expression = { + def findField(fields: Array[StructField]): Int = { +val checkField = (f: StructField) => resolver(f.name, fieldName) +val ordinal = fields.indexWhere(checkField) +if (ordinal == -1) { + sys.error( +s"No such struct field $fieldName in ${fields.map(_.name).mkString(", ")}") +} else if (fields.indexWhere(checkField, ordinal + 1) != -1) { + sys.error(s"Ambiguous reference to fields ${fields.filter(checkField).mkString(", ")}") +} else { + ordinal +} + } expr.dataType match { case StructType(fields) => - val actualField = fields.filter(f => resolver(f.name, fieldName)) - if (actualField.length == 0) { -sys.error( - s"No such struct field $fieldName in ${fields.map(_.name).mkString(", ")}") - } else if (actualField.length == 1) { -val field = actualField(0) -GetField(expr, field, fields.indexOf(field)) - } else { -sys.error(s"Ambiguous reference to fields ${actualField.mkString(", ")}") - } + val ordinal = findField(fields) + StructGetField(expr, fields(ordinal), ordinal) +case ArrayType(StructType(fields), containsNull) => + val ordinal = findField(fields) + ArrayGetField(expr, fields(ordinal), ordinal, containsNull) case otherType => sys.error(s"GetField is not valid on fields of type $otherType") } } http://git-wip-us.apache.org/repos/asf/spark/blob/15f557fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catal
spark git commit: [SPARK-2096][SQL] support dot notation on array of struct
Repository: spark Updated Branches: refs/heads/master 2a3629253 -> 0ee53ebce [SPARK-2096][SQL] support dot notation on array of struct ~~The rule is simple: If you want `a.b` work, then `a` must be some level of nested array of struct(level 0 means just a StructType). And the result of `a.b` is same level of nested array of b-type. An optimization is: the resolve chain looks like `Attribute -> GetItem -> GetField -> GetField ...`, so we could transmit the nested array information between `GetItem` and `GetField` to avoid repeated computation of `innerDataType` and `containsNullList` of that nested array.~~ marmbrus Could you take a look? to evaluate `a.b`, if `a` is array of struct, then `a.b` means get field `b` on each element of `a`, and return a result of array. Author: Wenchen Fan Closes #2405 from cloud-fan/nested-array-dot and squashes the following commits: 08a228a [Wenchen Fan] support dot notation on array of struct Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ee53ebc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ee53ebc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ee53ebc Branch: refs/heads/master Commit: 0ee53ebce9944722e76b2b28fae79d9956be9f17 Parents: 2a36292 Author: Wenchen Fan Authored: Mon Feb 9 16:39:34 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 16:39:34 2015 -0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 30 ++--- .../sql/catalyst/expressions/complexTypes.scala | 34 +--- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../expressions/ExpressionEvaluationSuite.scala | 2 +- .../org/apache/spark/sql/json/JsonSuite.scala | 6 ++-- 5 files changed, 53 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0ee53ebc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0b59ed1..fb2ff01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -22,8 +22,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{ArrayType, StructField, StructType, IntegerType} /** * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing @@ -311,18 +310,25 @@ class Analyzer(catalog: Catalog, * desired fields are found. */ protected def resolveGetField(expr: Expression, fieldName: String): Expression = { + def findField(fields: Array[StructField]): Int = { +val checkField = (f: StructField) => resolver(f.name, fieldName) +val ordinal = fields.indexWhere(checkField) +if (ordinal == -1) { + sys.error( +s"No such struct field $fieldName in ${fields.map(_.name).mkString(", ")}") +} else if (fields.indexWhere(checkField, ordinal + 1) != -1) { + sys.error(s"Ambiguous reference to fields ${fields.filter(checkField).mkString(", ")}") +} else { + ordinal +} + } expr.dataType match { case StructType(fields) => - val actualField = fields.filter(f => resolver(f.name, fieldName)) - if (actualField.length == 0) { -sys.error( - s"No such struct field $fieldName in ${fields.map(_.name).mkString(", ")}") - } else if (actualField.length == 1) { -val field = actualField(0) -GetField(expr, field, fields.indexOf(field)) - } else { -sys.error(s"Ambiguous reference to fields ${actualField.mkString(", ")}") - } + val ordinal = findField(fields) + StructGetField(expr, fields(ordinal), ordinal) +case ArrayType(StructType(fields), containsNull) => + val ordinal = findField(fields) + ArrayGetField(expr, fields(ordinal), ordinal, containsNull) case otherType => sys.error(s"GetField is not valid on fields of type $otherType") } } http://git-wip-us.apache.org/repos/asf/spark/blob/0ee53ebc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala -- diff
spark git commit: [SPARK-5614][SQL] Predicate pushdown through Generate.
Repository: spark Updated Branches: refs/heads/master b8080aa86 -> 2a3629253 [SPARK-5614][SQL] Predicate pushdown through Generate. Now in Catalyst's rules, predicates can not be pushed through "Generate" nodes. Further more, partition pruning in HiveTableScan can not be applied on those queries involves "Generate". This makes such queries very inefficient. In practice, it finds patterns like ```scala Filter(predicate, Generate(generator, _, _, _, grandChild)) ``` and splits the predicate into 2 parts by referencing the generated column from Generate node or not. And a new Filter will be created for those conjuncts can be pushed beneath Generate node. If nothing left for the original Filter, it will be removed. For example, physical plan for query ```sql select len, bk from s_server lateral view explode(len_arr) len_table as len where len > 5 and day = '20150102'; ``` where 'day' is a partition column in metastore is like this in current version of Spark SQL: > Project [len, bk] > > Filter ((len > "5") && "(day = "20150102")") > > Generate explode(len_arr), true, false > > HiveTableScan [bk, len_arr, day], (MetastoreRelation default, s_server, > None), None But theoretically the plan should be like this > Project [len, bk] > > Filter (len > "5") > > Generate explode(len_arr), true, false > > HiveTableScan [bk, len_arr, day], (MetastoreRelation default, s_server, > None), Some(day = "20150102") Where partition pruning predicates can be pushed to HiveTableScan nodes. Author: Lu Yan Closes #4394 from ianluyan/ppd and squashes the following commits: a67dce9 [Lu Yan] Fix English grammar. 7cea911 [Lu Yan] Revised based on @marmbrus's opinions ffc59fc [Lu Yan] [SPARK-5614][SQL] Predicate pushdown through Generate. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a362925 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a362925 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a362925 Branch: refs/heads/master Commit: 2a36292534a1e9f7a501e88f69bfc3a09fb62cb3 Parents: b8080aa Author: Lu Yan Authored: Mon Feb 9 16:25:38 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 16:25:38 2015 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 25 .../optimizer/FilterPushdownSuite.scala | 63 +++- 2 files changed, 87 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2a362925/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3bc48c9..fd58b96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -50,6 +50,7 @@ object DefaultOptimizer extends Optimizer { CombineFilters, PushPredicateThroughProject, PushPredicateThroughJoin, + PushPredicateThroughGenerate, ColumnPruning) :: Batch("LocalRelation", FixedPoint(100), ConvertToLocalRelation) :: Nil @@ -456,6 +457,30 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] { } /** + * Push [[Filter]] operators through [[Generate]] operators. Parts of the predicate that reference + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath. + */ +object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case filter @ Filter(condition, +generate @ Generate(generator, join, outer, alias, grandChild)) => + // Predicates that reference attributes produced by the `Generate` operator cannot + // be pushed below the operator. + val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { +conjunct => conjunct.references subsetOf grandChild.outputSet + } + if (pushDown.nonEmpty) { +val pushDownPredicate = pushDown.reduce(And) +val withPushdown = generate.copy(child = Filter(pushDownPredicate, grandChild)) +stayUp.reduceOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown) + } else { +filter + } + } +} + +/** * Pushes down [[Filter]] operators where the `condition` can be * evaluated using only the attributes of the left or right side of a join. Other * [[Filter]] conditions are moved into the `condition` of the [[Join]]. http://git-wip-us.apache.org/repos/asf/spark/blob/2a362925/sql/catalyst/src/test/scala/org/apache/spark/sql/cat
spark git commit: [SPARK-5614][SQL] Predicate pushdown through Generate.
Repository: spark Updated Branches: refs/heads/branch-1.3 379233cd0 -> ce2c89cfb [SPARK-5614][SQL] Predicate pushdown through Generate. Now in Catalyst's rules, predicates can not be pushed through "Generate" nodes. Further more, partition pruning in HiveTableScan can not be applied on those queries involves "Generate". This makes such queries very inefficient. In practice, it finds patterns like ```scala Filter(predicate, Generate(generator, _, _, _, grandChild)) ``` and splits the predicate into 2 parts by referencing the generated column from Generate node or not. And a new Filter will be created for those conjuncts can be pushed beneath Generate node. If nothing left for the original Filter, it will be removed. For example, physical plan for query ```sql select len, bk from s_server lateral view explode(len_arr) len_table as len where len > 5 and day = '20150102'; ``` where 'day' is a partition column in metastore is like this in current version of Spark SQL: > Project [len, bk] > > Filter ((len > "5") && "(day = "20150102")") > > Generate explode(len_arr), true, false > > HiveTableScan [bk, len_arr, day], (MetastoreRelation default, s_server, > None), None But theoretically the plan should be like this > Project [len, bk] > > Filter (len > "5") > > Generate explode(len_arr), true, false > > HiveTableScan [bk, len_arr, day], (MetastoreRelation default, s_server, > None), Some(day = "20150102") Where partition pruning predicates can be pushed to HiveTableScan nodes. Author: Lu Yan Closes #4394 from ianluyan/ppd and squashes the following commits: a67dce9 [Lu Yan] Fix English grammar. 7cea911 [Lu Yan] Revised based on @marmbrus's opinions ffc59fc [Lu Yan] [SPARK-5614][SQL] Predicate pushdown through Generate. (cherry picked from commit 2a36292534a1e9f7a501e88f69bfc3a09fb62cb3) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce2c89cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce2c89cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce2c89cf Branch: refs/heads/branch-1.3 Commit: ce2c89cfb0fc8950da95decd8384b431a7060fe9 Parents: 379233c Author: Lu Yan Authored: Mon Feb 9 16:25:38 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 16:26:01 2015 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 25 .../optimizer/FilterPushdownSuite.scala | 63 +++- 2 files changed, 87 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ce2c89cf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3bc48c9..fd58b96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -50,6 +50,7 @@ object DefaultOptimizer extends Optimizer { CombineFilters, PushPredicateThroughProject, PushPredicateThroughJoin, + PushPredicateThroughGenerate, ColumnPruning) :: Batch("LocalRelation", FixedPoint(100), ConvertToLocalRelation) :: Nil @@ -456,6 +457,30 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] { } /** + * Push [[Filter]] operators through [[Generate]] operators. Parts of the predicate that reference + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath. + */ +object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case filter @ Filter(condition, +generate @ Generate(generator, join, outer, alias, grandChild)) => + // Predicates that reference attributes produced by the `Generate` operator cannot + // be pushed below the operator. + val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { +conjunct => conjunct.references subsetOf grandChild.outputSet + } + if (pushDown.nonEmpty) { +val pushDownPredicate = pushDown.reduce(And) +val withPushdown = generate.copy(child = Filter(pushDownPredicate, grandChild)) +stayUp.reduceOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown) + } else { +filter + } + } +} + +/** * Pushes down [[Filter]] operators where the `condition` can be * evaluated using only the attributes of the left or right side of a join. Other * [[Filter]] conditions are moved into the `condition` of the [[Join]].
spark git commit: [SPARK-5696] [SQL] [HOTFIX] Asks HiveThriftServer2 to re-initialize log4j using Hive configurations
Repository: spark Updated Branches: refs/heads/master 5f0b30e59 -> b8080aa86 [SPARK-5696] [SQL] [HOTFIX] Asks HiveThriftServer2 to re-initialize log4j using Hive configurations In this way, log4j configurations overriden by jets3t-0.9.2.jar can be again overriden by Hive default log4j configurations. This might not be the best solution for this issue since it requires users to use `hive-log4j.properties` rather than `log4j.properties` to initialize `HiveThriftServer2` logging configurations, which can be confusing. The main purpose of this PR is to fix Jenkins PR build. [https://reviewable.io/review_button.png"; height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4484) Author: Cheng Lian Closes #4484 from liancheng/spark-5696 and squashes the following commits: df83956 [Cheng Lian] Hot fix: asks HiveThriftServer2 to re-initialize log4j using Hive configurations Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8080aa8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8080aa8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8080aa8 Branch: refs/heads/master Commit: b8080aa86d55e0467fd4328f10a2f0d6605e6cc6 Parents: 5f0b30e Author: Cheng Lian Authored: Mon Feb 9 16:23:12 2015 -0800 Committer: Cheng Lian Committed: Mon Feb 9 16:23:12 2015 -0800 -- .../apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala| 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b8080aa8/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 6e07df1..525777a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.thriftserver import org.apache.commons.logging.LogFactory +import org.apache.hadoop.hive.common.LogUtils import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} @@ -54,6 +55,8 @@ object HiveThriftServer2 extends Logging { System.exit(-1) } +LogUtils.initHiveLog4j() + logInfo("Starting SparkContext") SparkSQLEnv.init() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5696] [SQL] [HOTFIX] Asks HiveThriftServer2 to re-initialize log4j using Hive configurations
Repository: spark Updated Branches: refs/heads/branch-1.3 e24160142 -> 379233cd0 [SPARK-5696] [SQL] [HOTFIX] Asks HiveThriftServer2 to re-initialize log4j using Hive configurations In this way, log4j configurations overriden by jets3t-0.9.2.jar can be again overriden by Hive default log4j configurations. This might not be the best solution for this issue since it requires users to use `hive-log4j.properties` rather than `log4j.properties` to initialize `HiveThriftServer2` logging configurations, which can be confusing. The main purpose of this PR is to fix Jenkins PR build. [https://reviewable.io/review_button.png"; height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4484) Author: Cheng Lian Closes #4484 from liancheng/spark-5696 and squashes the following commits: df83956 [Cheng Lian] Hot fix: asks HiveThriftServer2 to re-initialize log4j using Hive configurations (cherry picked from commit b8080aa86d55e0467fd4328f10a2f0d6605e6cc6) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/379233cd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/379233cd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/379233cd Branch: refs/heads/branch-1.3 Commit: 379233cd00d17c5c94d6f2cd57557b4fac8b1a66 Parents: e241601 Author: Cheng Lian Authored: Mon Feb 9 16:23:12 2015 -0800 Committer: Cheng Lian Committed: Mon Feb 9 16:23:28 2015 -0800 -- .../apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala| 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/379233cd/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 6e07df1..525777a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.thriftserver import org.apache.commons.logging.LogFactory +import org.apache.hadoop.hive.common.LogUtils import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} @@ -54,6 +55,8 @@ object HiveThriftServer2 extends Logging { System.exit(-1) } +LogUtils.initHiveLog4j() + logInfo("Starting SparkContext") SparkSQLEnv.init() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL] Code cleanup.
Repository: spark Updated Branches: refs/heads/master 68b25cf69 -> 5f0b30e59 [SQL] Code cleanup. I added an unnecessary line of code in https://github.com/apache/spark/commit/13531dd97c08563e53dacdaeaf1102bdd13ef825. My bad. Let's delete it. Author: Yin Huai Closes #4482 from yhuai/unnecessaryCode and squashes the following commits: 3645af0 [Yin Huai] Code cleanup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f0b30e5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f0b30e5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f0b30e5 Branch: refs/heads/master Commit: 5f0b30e59cc6a3017168189d3aaf09402699dc3b Parents: 68b25cf Author: Yin Huai Authored: Mon Feb 9 16:20:42 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 16:20:42 2015 -0800 -- .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 3 --- 1 file changed, 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5f0b30e5/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c23575f..036efa8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -351,9 +351,6 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |) """.stripMargin) -new Path("/Users/yhuai/Desktop/whatever") - - val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable") val filesystemPath = new Path(expectedPath) val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL] Code cleanup.
Repository: spark Updated Branches: refs/heads/branch-1.3 a70dca025 -> e24160142 [SQL] Code cleanup. I added an unnecessary line of code in https://github.com/apache/spark/commit/13531dd97c08563e53dacdaeaf1102bdd13ef825. My bad. Let's delete it. Author: Yin Huai Closes #4482 from yhuai/unnecessaryCode and squashes the following commits: 3645af0 [Yin Huai] Code cleanup. (cherry picked from commit 5f0b30e59cc6a3017168189d3aaf09402699dc3b) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2416014 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2416014 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2416014 Branch: refs/heads/branch-1.3 Commit: e2416014206ac6e7207dac95fcccf2d2377bc3f5 Parents: a70dca0 Author: Yin Huai Authored: Mon Feb 9 16:20:42 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 16:20:51 2015 -0800 -- .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 3 --- 1 file changed, 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e2416014/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c23575f..036efa8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -351,9 +351,6 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |) """.stripMargin) -new Path("/Users/yhuai/Desktop/whatever") - - val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable") val filesystemPath = new Path(expectedPath) val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL] Add some missing DataFrame functions.
Repository: spark Updated Branches: refs/heads/master b884daa58 -> 68b25cf69 [SQL] Add some missing DataFrame functions. - as with a `Symbol` - distinct - sqlContext.emptyDataFrame - move add/remove col out of RDDApi section Author: Michael Armbrust Closes #4437 from marmbrus/dfMissingFuncs and squashes the following commits: 2004023 [Michael Armbrust] Add missing functions Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68b25cf6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68b25cf6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68b25cf6 Branch: refs/heads/master Commit: 68b25cf695e0fce9e465288d5a053e540a3fccb4 Parents: b884daa Author: Michael Armbrust Authored: Mon Feb 9 16:02:56 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 16:02:56 2015 -0800 -- .../scala/org/apache/spark/sql/Column.scala | 9 ++ .../scala/org/apache/spark/sql/DataFrame.scala | 12 +-- .../org/apache/spark/sql/DataFrameImpl.scala| 34 .../apache/spark/sql/IncomputableColumn.scala | 10 +++--- .../scala/org/apache/spark/sql/RDDApi.scala | 2 ++ .../scala/org/apache/spark/sql/SQLContext.scala | 5 ++- 6 files changed, 51 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/Column.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 878b2b0..1011bf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -550,6 +550,15 @@ trait Column extends DataFrame { override def as(alias: String): Column = exprToColumn(Alias(expr, alias)()) /** + * Gives the column an alias. + * {{{ + * // Renames colA to colB in select output. + * df.select($"colA".as('colB)) + * }}} + */ + override def as(alias: Symbol): Column = exprToColumn(Alias(expr, alias.name)()) + + /** * Casts the column to a different data type. * {{{ * // Casts colA to IntegerType. http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 17ea3cd..6abfb78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -156,7 +156,7 @@ trait DataFrame extends RDDApi[Row] { def join(right: DataFrame, joinExprs: Column): DataFrame /** - * Join with another [[DataFrame]], usin g the given join expression. The following performs + * Join with another [[DataFrame]], using the given join expression. The following performs * a full outer join between `df1` and `df2`. * * {{{ @@ -233,7 +233,12 @@ trait DataFrame extends RDDApi[Row] { /** * Returns a new [[DataFrame]] with an alias set. */ - def as(name: String): DataFrame + def as(alias: String): DataFrame + + /** + * (Scala-specific) Returns a new [[DataFrame]] with an alias set. + */ + def as(alias: Symbol): DataFrame /** * Selects a set of expressions. @@ -516,6 +521,9 @@ trait DataFrame extends RDDApi[Row] { */ override def repartition(numPartitions: Int): DataFrame + /** Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. */ + override def distinct: DataFrame + override def persist(): this.type override def persist(newLevel: StorageLevel): this.type http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index fa05a5d..7339329 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -196,7 +196,9 @@ private[sql] class DataFrameImpl protected[sql]( }.toSeq :_*) } - override def as(name: String): DataFrame = Subquery(name, logicalPlan) + override def as(alias: String): DataFrame = Subquery(alias, logicalPlan) + + override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan) override def select(cols: Column*): DataFrame = { val exprs = cols.zipWithIndex.map { @@ -215,7 +217,19 @@ private[sql] class DataFrameImpl
spark git commit: [SQL] Add some missing DataFrame functions.
Repository: spark Updated Branches: refs/heads/branch-1.3 1e2fab22b -> a70dca025 [SQL] Add some missing DataFrame functions. - as with a `Symbol` - distinct - sqlContext.emptyDataFrame - move add/remove col out of RDDApi section Author: Michael Armbrust Closes #4437 from marmbrus/dfMissingFuncs and squashes the following commits: 2004023 [Michael Armbrust] Add missing functions (cherry picked from commit 68b25cf695e0fce9e465288d5a053e540a3fccb4) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a70dca02 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a70dca02 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a70dca02 Branch: refs/heads/branch-1.3 Commit: a70dca025b24053a00b328375dd00339a5c72da2 Parents: 1e2fab2 Author: Michael Armbrust Authored: Mon Feb 9 16:02:56 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 16:03:05 2015 -0800 -- .../scala/org/apache/spark/sql/Column.scala | 9 ++ .../scala/org/apache/spark/sql/DataFrame.scala | 12 +-- .../org/apache/spark/sql/DataFrameImpl.scala| 34 .../apache/spark/sql/IncomputableColumn.scala | 10 +++--- .../scala/org/apache/spark/sql/RDDApi.scala | 2 ++ .../scala/org/apache/spark/sql/SQLContext.scala | 5 ++- 6 files changed, 51 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a70dca02/sql/core/src/main/scala/org/apache/spark/sql/Column.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 878b2b0..1011bf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -550,6 +550,15 @@ trait Column extends DataFrame { override def as(alias: String): Column = exprToColumn(Alias(expr, alias)()) /** + * Gives the column an alias. + * {{{ + * // Renames colA to colB in select output. + * df.select($"colA".as('colB)) + * }}} + */ + override def as(alias: Symbol): Column = exprToColumn(Alias(expr, alias.name)()) + + /** * Casts the column to a different data type. * {{{ * // Casts colA to IntegerType. http://git-wip-us.apache.org/repos/asf/spark/blob/a70dca02/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 17ea3cd..6abfb78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -156,7 +156,7 @@ trait DataFrame extends RDDApi[Row] { def join(right: DataFrame, joinExprs: Column): DataFrame /** - * Join with another [[DataFrame]], usin g the given join expression. The following performs + * Join with another [[DataFrame]], using the given join expression. The following performs * a full outer join between `df1` and `df2`. * * {{{ @@ -233,7 +233,12 @@ trait DataFrame extends RDDApi[Row] { /** * Returns a new [[DataFrame]] with an alias set. */ - def as(name: String): DataFrame + def as(alias: String): DataFrame + + /** + * (Scala-specific) Returns a new [[DataFrame]] with an alias set. + */ + def as(alias: Symbol): DataFrame /** * Selects a set of expressions. @@ -516,6 +521,9 @@ trait DataFrame extends RDDApi[Row] { */ override def repartition(numPartitions: Int): DataFrame + /** Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. */ + override def distinct: DataFrame + override def persist(): this.type override def persist(newLevel: StorageLevel): this.type http://git-wip-us.apache.org/repos/asf/spark/blob/a70dca02/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index fa05a5d..7339329 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -196,7 +196,9 @@ private[sql] class DataFrameImpl protected[sql]( }.toSeq :_*) } - override def as(name: String): DataFrame = Subquery(name, logicalPlan) + override def as(alias: String): DataFrame = Subquery(alias, logicalPlan) + + override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan) override def select(cols: Column*
spark git commit: [SPARK-5611] [EC2] Allow spark-ec2 repo and branch to be set on CLI of spark_ec2.py
Repository: spark Updated Branches: refs/heads/master f48199eb3 -> b884daa58 [SPARK-5611] [EC2] Allow spark-ec2 repo and branch to be set on CLI of spark_ec2.py and by extension, the ami-list Useful for using alternate spark-ec2 repos or branches. Author: Florian Verhein Closes #4385 from florianverhein/master and squashes the following commits: 7e2b4be [Florian Verhein] [SPARK-5611] [EC2] typo 8b653dc [Florian Verhein] [SPARK-5611] [EC2] Enforce only supporting spark-ec2 forks from github, log improvement bc4b0ed [Florian Verhein] [SPARK-5611] allow spark-ec2 repos with different names 8b5c551 [Florian Verhein] improve option naming, fix logging, fix lint failing, add guard to enforce spark-ec2 7724308 [Florian Verhein] [SPARK-5611] [EC2] fixes b42b68c [Florian Verhein] [SPARK-5611] [EC2] Allow spark-ec2 repo and branch to be set on CLI of spark_ec2.py Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b884daa5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b884daa5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b884daa5 Branch: refs/heads/master Commit: b884daa58084d4f42e2318894067565b94e07f9d Parents: f48199e Author: Florian Verhein Authored: Mon Feb 9 23:47:07 2015 + Committer: Sean Owen Committed: Mon Feb 9 23:47:07 2015 + -- ec2/spark_ec2.py | 37 - 1 file changed, 32 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b884daa5/ec2/spark_ec2.py -- diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 87b2112..3e4c49c 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -62,10 +62,10 @@ VALID_SPARK_VERSIONS = set([ DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark"; -MESOS_SPARK_EC2_BRANCH = "branch-1.3" -# A URL prefix from which to fetch AMI information -AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/{b}/ami-list".format(b=MESOS_SPARK_EC2_BRANCH) +# Default location to get the spark-ec2 scripts (and ami-list) from +DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/mesos/spark-ec2"; +DEFAULT_SPARK_EC2_BRANCH = "branch-1.3" def setup_boto(): @@ -148,6 +148,14 @@ def parse_args(): default=DEFAULT_SPARK_GITHUB_REPO, help="Github repo from which to checkout supplied commit hash (default: %default)") parser.add_option( +"--spark-ec2-git-repo", +default=DEFAULT_SPARK_EC2_GITHUB_REPO, +help="Github repo from which to checkout spark-ec2 (default: %default)") +parser.add_option( +"--spark-ec2-git-branch", +default=DEFAULT_SPARK_EC2_BRANCH, +help="Github repo branch of spark-ec2 to use (default: %default)") +parser.add_option( "--hadoop-major-version", default="1", help="Major version of Hadoop (default: %default)") parser.add_option( @@ -333,7 +341,12 @@ def get_spark_ami(opts): print >> stderr,\ "Don't recognize %s, assuming type is pvm" % opts.instance_type -ami_path = "%s/%s/%s" % (AMI_PREFIX, opts.region, instance_type) +# URL prefix from which to fetch AMI information +ami_prefix = "{r}/{b}/ami-list".format( +r=opts.spark_ec2_git_repo.replace("https://github.com";, "https://raw.github.com";, 1), +b=opts.spark_ec2_git_branch) + +ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type) try: ami = urllib2.urlopen(ami_path).read().strip() print "Spark AMI: " + ami @@ -650,12 +663,15 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): # NOTE: We should clone the repository before running deploy_files to # prevent ec2-variables.sh from being overwritten +print "Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format( +r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch) ssh( host=master, opts=opts, command="rm -rf spark-ec2" + " && " -+ "git clone https://github.com/mesos/spark-ec2.git -b {b}".format(b=MESOS_SPARK_EC2_BRANCH) ++ "git clone {r} -b {b} spark-ec2".format(r=opts.spark_ec2_git_repo, + b=opts.spark_ec2_git_branch) ) print "Deploying files to master..." @@ -1038,6 +1054,17 @@ def real_main(): print >> stderr, "ebs-vol-num cannot be greater than 8" sys.exit(1) +# Prevent breaking ami_prefix (/, .git and startswith checks) +# Prevent forks with non spark-ec2 names for now. +if opts.spark_ec2_git_repo.endswith("/") or \ +opts.spark_ec2_git_repo.endswith(".git") or \ +not opts.spark_ec2_git_repo.startswith("https://github.com"
spark git commit: [SQL] Fix flaky SET test
Repository: spark Updated Branches: refs/heads/branch-1.1 03d4097bc -> 651ceaeb3 [SQL] Fix flaky SET test Author: Michael Armbrust Closes #4480 from marmbrus/fixSetTests and squashes the following commits: f2e501e [Michael Armbrust] [SQL] Fix flaky SET test Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/651ceaeb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/651ceaeb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/651ceaeb Branch: refs/heads/branch-1.1 Commit: 651ceaeb31fe8b090d93a714e58a1e4efb58447e Parents: 03d4097 Author: Michael Armbrust Authored: Mon Feb 9 14:57:55 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 14:57:55 2015 -0800 -- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/651ceaeb/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index cdf9844..32c2d76 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -541,8 +541,8 @@ class HiveQuerySuite extends HiveComparisonTest { sql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) -assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { - sql(s"SET").collect().map(_.getString(0)) +assertResult(Set(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { + sql(s"SET").collect().map(_.getString(0)).toSet } assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { collectResults(sql("SET -v")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5675][SQL] XyzType companion object should subclass XyzType
Repository: spark Updated Branches: refs/heads/branch-1.3 18c5a999b -> 1e2fab22b [SPARK-5675][SQL] XyzType companion object should subclass XyzType Otherwise, the following will always return false in Java. ```scala dataType instanceof StringType ``` Author: Reynold Xin Closes #4463 from rxin/type-companion-object and squashes the following commits: 04d5d8d [Reynold Xin] Comment. 976e11e [Reynold Xin] [SPARK-5675][SQL]StringType case object should be subclass of StringType class (cherry picked from commit f48199eb354d6ec8675c2c1f96c3005064058d66) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e2fab22 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e2fab22 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e2fab22 Branch: refs/heads/branch-1.3 Commit: 1e2fab22b81021c6fd84c13207ceeb5137b1b317 Parents: 18c5a99 Author: Reynold Xin Authored: Mon Feb 9 14:51:46 2015 -0800 Committer: Reynold Xin Committed: Mon Feb 9 14:52:00 2015 -0800 -- .../org/apache/spark/sql/types/dataTypes.scala | 85 +--- 1 file changed, 73 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1e2fab22/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index 91efe32..2abb1ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -240,10 +240,16 @@ abstract class DataType { * @group dataType */ @DeveloperApi -case object NullType extends DataType { +class NullType private() extends DataType { + // The companion object and this class is separated so the companion object also subclasses + // this type. Otherwise, the companion object would be of type "NullType$" in byte code. + // Defined with a private constructor so the companion object is the only possible instantiation. override def defaultSize: Int = 1 } +case object NullType extends NullType + + protected[sql] object NativeType { val all = Seq( IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType) @@ -292,7 +298,10 @@ protected[sql] abstract class NativeType extends DataType { * @group dataType */ @DeveloperApi -case object StringType extends NativeType with PrimitiveType { +class StringType private() extends NativeType with PrimitiveType { + // The companion object and this class is separated so the companion object also subclasses + // this type. Otherwise, the companion object would be of type "StringType$" in byte code. + // Defined with a private constructor so the companion object is the only possible instantiation. private[sql] type JvmType = String @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] } private[sql] val ordering = implicitly[Ordering[JvmType]] @@ -303,6 +312,8 @@ case object StringType extends NativeType with PrimitiveType { override def defaultSize: Int = 4096 } +case object StringType extends StringType + /** * :: DeveloperApi :: @@ -313,7 +324,10 @@ case object StringType extends NativeType with PrimitiveType { * @group dataType */ @DeveloperApi -case object BinaryType extends NativeType with PrimitiveType { +class BinaryType private() extends NativeType with PrimitiveType { + // The companion object and this class is separated so the companion object also subclasses + // this type. Otherwise, the companion object would be of type "BinaryType$" in byte code. + // Defined with a private constructor so the companion object is the only possible instantiation. private[sql] type JvmType = Array[Byte] @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] } private[sql] val ordering = new Ordering[JvmType] { @@ -332,6 +346,8 @@ case object BinaryType extends NativeType with PrimitiveType { override def defaultSize: Int = 4096 } +case object BinaryType extends BinaryType + /** * :: DeveloperApi :: @@ -341,7 +357,10 @@ case object BinaryType extends NativeType with PrimitiveType { *@group dataType */ @DeveloperApi -case object BooleanType extends NativeType with PrimitiveType { +class BooleanType private() extends NativeType with PrimitiveType { + // The companion object and this class is separated so the companion object also subclasses + // this type. Otherwise, the companion object would be of type "BooleanType$" in byte code. + // Defined with a private constructor so the companion obj
spark git commit: [SPARK-5675][SQL] XyzType companion object should subclass XyzType
Repository: spark Updated Branches: refs/heads/master 0765af9b2 -> f48199eb3 [SPARK-5675][SQL] XyzType companion object should subclass XyzType Otherwise, the following will always return false in Java. ```scala dataType instanceof StringType ``` Author: Reynold Xin Closes #4463 from rxin/type-companion-object and squashes the following commits: 04d5d8d [Reynold Xin] Comment. 976e11e [Reynold Xin] [SPARK-5675][SQL]StringType case object should be subclass of StringType class Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f48199eb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f48199eb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f48199eb Branch: refs/heads/master Commit: f48199eb354d6ec8675c2c1f96c3005064058d66 Parents: 0765af9 Author: Reynold Xin Authored: Mon Feb 9 14:51:46 2015 -0800 Committer: Reynold Xin Committed: Mon Feb 9 14:51:46 2015 -0800 -- .../org/apache/spark/sql/types/dataTypes.scala | 85 +--- 1 file changed, 73 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f48199eb/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index 91efe32..2abb1ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -240,10 +240,16 @@ abstract class DataType { * @group dataType */ @DeveloperApi -case object NullType extends DataType { +class NullType private() extends DataType { + // The companion object and this class is separated so the companion object also subclasses + // this type. Otherwise, the companion object would be of type "NullType$" in byte code. + // Defined with a private constructor so the companion object is the only possible instantiation. override def defaultSize: Int = 1 } +case object NullType extends NullType + + protected[sql] object NativeType { val all = Seq( IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType) @@ -292,7 +298,10 @@ protected[sql] abstract class NativeType extends DataType { * @group dataType */ @DeveloperApi -case object StringType extends NativeType with PrimitiveType { +class StringType private() extends NativeType with PrimitiveType { + // The companion object and this class is separated so the companion object also subclasses + // this type. Otherwise, the companion object would be of type "StringType$" in byte code. + // Defined with a private constructor so the companion object is the only possible instantiation. private[sql] type JvmType = String @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] } private[sql] val ordering = implicitly[Ordering[JvmType]] @@ -303,6 +312,8 @@ case object StringType extends NativeType with PrimitiveType { override def defaultSize: Int = 4096 } +case object StringType extends StringType + /** * :: DeveloperApi :: @@ -313,7 +324,10 @@ case object StringType extends NativeType with PrimitiveType { * @group dataType */ @DeveloperApi -case object BinaryType extends NativeType with PrimitiveType { +class BinaryType private() extends NativeType with PrimitiveType { + // The companion object and this class is separated so the companion object also subclasses + // this type. Otherwise, the companion object would be of type "BinaryType$" in byte code. + // Defined with a private constructor so the companion object is the only possible instantiation. private[sql] type JvmType = Array[Byte] @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] } private[sql] val ordering = new Ordering[JvmType] { @@ -332,6 +346,8 @@ case object BinaryType extends NativeType with PrimitiveType { override def defaultSize: Int = 4096 } +case object BinaryType extends BinaryType + /** * :: DeveloperApi :: @@ -341,7 +357,10 @@ case object BinaryType extends NativeType with PrimitiveType { *@group dataType */ @DeveloperApi -case object BooleanType extends NativeType with PrimitiveType { +class BooleanType private() extends NativeType with PrimitiveType { + // The companion object and this class is separated so the companion object also subclasses + // this type. Otherwise, the companion object would be of type "BooleanType$" in byte code. + // Defined with a private constructor so the companion object is the only possible instantiation. private[sql] type JvmType = Boolean @transient private[sql]
spark git commit: [SPARK-4905][STREAMING] FlumeStreamSuite fix.
Repository: spark Updated Branches: refs/heads/branch-1.2 97541b22e -> 63eee523e [SPARK-4905][STREAMING] FlumeStreamSuite fix. Using String constructor instead of CharsetDecoder to see if it fixes the issue of empty strings in Flume test output. Author: Hari Shreedharan Closes #4371 from harishreedharan/Flume-stream-attempted-fix and squashes the following commits: 550d363 [Hari Shreedharan] Fix imports. 8695950 [Hari Shreedharan] Use Charsets.UTF_8 instead of "UTF-8" in String constructors. af3ba14 [Hari Shreedharan] [SPARK-4905][STREAMING] FlumeStreamSuite fix. (cherry picked from commit 0765af9b21e9204c410c7a849c7201bc3eda8cc3) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63eee523 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63eee523 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63eee523 Branch: refs/heads/branch-1.2 Commit: 63eee523ea0b39b1db1a656a8b28828d93d7 Parents: 97541b2 Author: Hari Shreedharan Authored: Mon Feb 9 14:17:14 2015 -0800 Committer: Josh Rosen Committed: Mon Feb 9 14:18:59 2015 -0800 -- .../org/apache/spark/streaming/flume/FlumeStreamSuite.scala | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/63eee523/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala -- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index f333e38..322de7b 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -19,13 +19,13 @@ package org.apache.spark.streaming.flume import java.net.{InetSocketAddress, ServerSocket} import java.nio.ByteBuffer -import java.nio.charset.Charset import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.duration._ import scala.language.postfixOps +import com.google.common.base.Charsets import org.apache.avro.ipc.NettyTransceiver import org.apache.avro.ipc.specific.SpecificRequestor import org.apache.flume.source.avro @@ -108,7 +108,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L val inputEvents = input.map { item => val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8"))) + event.setBody(ByteBuffer.wrap(item.getBytes(Charsets.UTF_8))) event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) event } @@ -138,14 +138,13 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L status should be (avro.Status.OK) } -val decoder = Charset.forName("UTF-8").newDecoder() eventually(timeout(10 seconds), interval(100 milliseconds)) { val outputEvents = outputBuffer.flatten.map { _.event } outputEvents.foreach { event => event.getHeaders.get("test") should be("header") } - val output = outputEvents.map(event => decoder.decode(event.getBody()).toString) + val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8)) output should be (input) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4905][STREAMING] FlumeStreamSuite fix.
Repository: spark Updated Branches: refs/heads/master 6fe70d843 -> 0765af9b2 [SPARK-4905][STREAMING] FlumeStreamSuite fix. Using String constructor instead of CharsetDecoder to see if it fixes the issue of empty strings in Flume test output. Author: Hari Shreedharan Closes #4371 from harishreedharan/Flume-stream-attempted-fix and squashes the following commits: 550d363 [Hari Shreedharan] Fix imports. 8695950 [Hari Shreedharan] Use Charsets.UTF_8 instead of "UTF-8" in String constructors. af3ba14 [Hari Shreedharan] [SPARK-4905][STREAMING] FlumeStreamSuite fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0765af9b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0765af9b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0765af9b Branch: refs/heads/master Commit: 0765af9b21e9204c410c7a849c7201bc3eda8cc3 Parents: 6fe70d8 Author: Hari Shreedharan Authored: Mon Feb 9 14:17:14 2015 -0800 Committer: Josh Rosen Committed: Mon Feb 9 14:17:14 2015 -0800 -- .../org/apache/spark/streaming/flume/FlumeStreamSuite.scala | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0765af9b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala -- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index f333e38..322de7b 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -19,13 +19,13 @@ package org.apache.spark.streaming.flume import java.net.{InetSocketAddress, ServerSocket} import java.nio.ByteBuffer -import java.nio.charset.Charset import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.duration._ import scala.language.postfixOps +import com.google.common.base.Charsets import org.apache.avro.ipc.NettyTransceiver import org.apache.avro.ipc.specific.SpecificRequestor import org.apache.flume.source.avro @@ -108,7 +108,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L val inputEvents = input.map { item => val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8"))) + event.setBody(ByteBuffer.wrap(item.getBytes(Charsets.UTF_8))) event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) event } @@ -138,14 +138,13 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L status should be (avro.Status.OK) } -val decoder = Charset.forName("UTF-8").newDecoder() eventually(timeout(10 seconds), interval(100 milliseconds)) { val outputEvents = outputBuffer.flatten.map { _.event } outputEvents.foreach { event => event.getHeaders.get("test") should be("header") } - val output = outputEvents.map(event => decoder.decode(event.getBody()).toString) + val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8)) output should be (input) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4905][STREAMING] FlumeStreamSuite fix.
Repository: spark Updated Branches: refs/heads/branch-1.3 6a0144c63 -> 18c5a999b [SPARK-4905][STREAMING] FlumeStreamSuite fix. Using String constructor instead of CharsetDecoder to see if it fixes the issue of empty strings in Flume test output. Author: Hari Shreedharan Closes #4371 from harishreedharan/Flume-stream-attempted-fix and squashes the following commits: 550d363 [Hari Shreedharan] Fix imports. 8695950 [Hari Shreedharan] Use Charsets.UTF_8 instead of "UTF-8" in String constructors. af3ba14 [Hari Shreedharan] [SPARK-4905][STREAMING] FlumeStreamSuite fix. (cherry picked from commit 0765af9b21e9204c410c7a849c7201bc3eda8cc3) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18c5a999 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18c5a999 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18c5a999 Branch: refs/heads/branch-1.3 Commit: 18c5a999b94b992dbb6fadf70a04b967c498353d Parents: 6a0144c Author: Hari Shreedharan Authored: Mon Feb 9 14:17:14 2015 -0800 Committer: Josh Rosen Committed: Mon Feb 9 14:17:36 2015 -0800 -- .../org/apache/spark/streaming/flume/FlumeStreamSuite.scala | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/18c5a999/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala -- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index f333e38..322de7b 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -19,13 +19,13 @@ package org.apache.spark.streaming.flume import java.net.{InetSocketAddress, ServerSocket} import java.nio.ByteBuffer -import java.nio.charset.Charset import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.duration._ import scala.language.postfixOps +import com.google.common.base.Charsets import org.apache.avro.ipc.NettyTransceiver import org.apache.avro.ipc.specific.SpecificRequestor import org.apache.flume.source.avro @@ -108,7 +108,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L val inputEvents = input.map { item => val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8"))) + event.setBody(ByteBuffer.wrap(item.getBytes(Charsets.UTF_8))) event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) event } @@ -138,14 +138,13 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L status should be (avro.Status.OK) } -val decoder = Charset.forName("UTF-8").newDecoder() eventually(timeout(10 seconds), interval(100 milliseconds)) { val outputEvents = outputBuffer.flatten.map { _.event } outputEvents.foreach { event => event.getHeaders.get("test") should be("header") } - val output = outputEvents.map(event => decoder.decode(event.getBody()).toString) + val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8)) output should be (input) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r1658579 - in /spark: releases/_posts/2015-02-09-spark-release-1-2-1.md site/releases/spark-release-1-2-1.html
Author: pwendell Date: Mon Feb 9 21:27:38 2015 New Revision: 1658579 URL: http://svn.apache.org/r1658579 Log: Adding release source doc Added: spark/releases/_posts/2015-02-09-spark-release-1-2-1.md Modified: spark/site/releases/spark-release-1-2-1.html Added: spark/releases/_posts/2015-02-09-spark-release-1-2-1.md URL: http://svn.apache.org/viewvc/spark/releases/_posts/2015-02-09-spark-release-1-2-1.md?rev=1658579&view=auto == --- spark/releases/_posts/2015-02-09-spark-release-1-2-1.md (added) +++ spark/releases/_posts/2015-02-09-spark-release-1-2-1.md Mon Feb 9 21:27:38 2015 @@ -0,0 +1,111 @@ +--- +layout: post +title: Spark Release 1.2.1 +categories: [] +tags: [] +status: publish +type: post +published: true +meta: + _edit_last: '4' + _wpas_done_all: '1' +--- + +Spark 1.2.1 is a maintenance release containing stability fixes. This release is based on the [branch-1.2](https://github.com/apache/spark/tree/branch-1.2) maintenance branch of Spark. We recommend all 1.2.0 users to upgrade to this stable release. Contributions to this release came from 69 developers. + +To download Spark 1.2.1 visit the downloads page. + +### Fixes +Spark 1.2.1 contains bug fixes in several components. Some of the more important fixes are highlighted below. You can visit the [Spark issue tracker](http://s.apache.org/Mpn) for the full list of fixes. + + Security +- Locks down file permissions for temporary file storage + + Spark Core +- Netty shuffle ignores spark.blockManager.port ([SPARK-4837](https://issues.apache.org/jira/browse/SPARK-4837)) +- MetricsServlet does not initialize properly ([SPARK-4595](https://issues.apache.org/jira/browse/SPARK-4595)) +- Repl and YARN dependencies are not published to Maven ([SPARK-5289](https://issues.apache.org/jira/browse/SPARK-5289)) +- SparkConf is not thread safe ([SPARK-5355](https://issues.apache.org/jira/browse/SPARK-5355)) +- Byte code errors when linking against Spark ([SPARK-2075](https://issues.apache.org/jira/browse/SPARK-2075)) + + SQL +- CACHE TABLE AS SELECT fails with Hive UDFs ([SPARK-5187](https://issues.apache.org/jira/browse/SPARK-5187)) +- Attributes are case sensitive when using a select query from a projection ([SPARK-4959](https://issues.apache.org/jira/browse/SPARK-4959)) +- Spark SQL built for Hive 13 fails under concurrent metadata queries ([SPARK-4908](https://issues.apache.org/jira/browse/SPARK-4908)) +- Throw "Expression not in GROUP BY" when using same expression in group by clause and select clause ([SPARK-4296](https://issues.apache.org/jira/browse/SPARK-4296)) + + Streaming +- Proper file clean up for write ahead logs ([SPARK-5147](https://issues.apache.org/jira/browse/SPARK-5147)) +- Error with existing files during checkpoint recovery ([SPARK-4835](https://issues.apache.org/jira/browse/SPARK-4835)) +- Socket Receiver does not stop when streaming context is stopped ([SPARK-2892](https://issues.apache.org/jira/browse/SPARK-2892)) + + PySpark +- Parallelizing lists or arrays is slow ([SPARK-5224](https://issues.apache.org/jira/browse/SPARK-5224)) +- Serializer bug when using zip ([SPARK-4841](https://issues.apache.org/jira/browse/SPARK-4841)) +- Support Vector types within a dictionary ([SPARK-5223](https://issues.apache.org/jira/browse/SPARK-5223)) + +### Contributors +The following developers contributed to this release: + + * Aaron Davidson -- Bug fixes in Core + * Alex Liu -- Improvements in Core and SQL; bug fixes in SQL + * Andrew Ash -- Documentation in Core + * Andrew Or -- Improvements in Core and YARN; bug fixes in Core and YARN + * Bilna -- Test in Streaming + * Brennon York -- Bug fixes in Core + * Cheng Hao -- Bug fixes in Core and SQL + * Cheng Lian -- Bug fixes in Core + * Christophe Preaud -- Improvements in Core + * Dale Richardson -- Improvement in Core + * Davies Liu -- Bug fixes in Core, MLlib, and PySpark + * Derek Ma -- Bug fixes in Shuffle + * Earne -- Improvements in Core and GraphX + * GuoQiang Li -- Bug fixes in Core and YARN + * Hari Shreedharan -- Bug fixes in Streaming + * Ilayaperumal Gopinathan -- Bug fixes in Streaming + * Ilya Ganelin -- Bug fixes in Core and Shuffle + * Jacek Lewandowski -- Bug fixes in Core + * Jeremy Freeman -- Bug fixes in MLlib and PySpark + * Jongyoul Lee -- Documentation in Streaming; bug fixes in Core and Mesos + * Joseph K. Bradley -- Bug fixes in Core, MLlib, and PySpark + * Josh Rosen -- Improvements in Core and SQL; new features in Core; bug fixes in Streaming and PySpark + * Kanwaljit Singh -- Bug fixes in Core + * Kenji Kikushima -- Bug fixes in GraphX + * Kousuke Saruta -- Bug fixes in Core and Web UI + * Lianhui Wang -- Bug fixes in Core + * Madhu Siddalingaiah -- Documentation in Core + * Marcelo Vanzin -- Bug fixes in Core + * Michael Armbrust -- Improvements in Core; bug fixes in SQL + * Michael Davies -- Improvements in SQL + * Nan Zhu -- Improvements and
spark git commit: [SPARK-5691] Fixing wrong data structure lookup for dupe app registration
Repository: spark Updated Branches: refs/heads/branch-1.1 40bce6350 -> 03d4097bc [SPARK-5691] Fixing wrong data structure lookup for dupe app registration In Master's registerApplication method, it checks if the application had already registered by examining the addressToWorker hash map. In reality, it should refer to the addressToApp data structure, as this is what really tracks which apps have been registered. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03d4097b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03d4097b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03d4097b Branch: refs/heads/branch-1.1 Commit: 03d4097bc5cfad4f98192919a157478de1589d5b Parents: 40bce63 Author: mcheah Authored: Mon Feb 9 19:58:58 2015 +0100 Committer: Andrew Or Committed: Mon Feb 9 13:21:59 2015 -0800 -- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/03d4097b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index abbc6e7..d482415 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -615,7 +615,7 @@ private[spark] class Master( def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.path.address -if (addressToWorker.contains(appAddress)) { +if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5691] Fixing wrong data structure lookup for dupe app registratio...
Repository: spark Updated Branches: refs/heads/master dae216147 -> 6fe70d843 [SPARK-5691] Fixing wrong data structure lookup for dupe app registratio... In Master's registerApplication method, it checks if the application had already registered by examining the addressToWorker hash map. In reality, it should refer to the addressToApp data structure, as this is what really tracks which apps have been registered. Author: mcheah Closes #4477 from mccheah/spark-5691 and squashes the following commits: efdc573 [mcheah] [SPARK-5691] Fixing wrong data structure lookup for dupe app registration Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6fe70d84 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6fe70d84 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6fe70d84 Branch: refs/heads/master Commit: 6fe70d8432314f0b7290a66f114306f61e0a87cc Parents: dae2161 Author: mcheah Authored: Mon Feb 9 13:20:14 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 13:20:14 2015 -0800 -- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6fe70d84/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b8b1a25..53e4539 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -671,7 +671,7 @@ private[spark] class Master( def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.path.address -if (addressToWorker.contains(appAddress)) { +if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5691] Fixing wrong data structure lookup for dupe app registration
Repository: spark Updated Branches: refs/heads/branch-1.0 444ccdd80 -> f74bccbe3 [SPARK-5691] Fixing wrong data structure lookup for dupe app registration In Master's registerApplication method, it checks if the application had already registered by examining the addressToWorker hash map. In reality, it should refer to the addressToApp data structure, as this is what really tracks which apps have been registered. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f74bccbe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f74bccbe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f74bccbe Branch: refs/heads/branch-1.0 Commit: f74bccbe3d38a316a86671a5699aec0636ea1b73 Parents: 444ccdd Author: mcheah Authored: Mon Feb 9 19:58:58 2015 +0100 Committer: Andrew Or Committed: Mon Feb 9 13:21:35 2015 -0800 -- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f74bccbe/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f4f7f54..bee3962 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -608,7 +608,7 @@ private[spark] class Master( def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.path.address -if (addressToWorker.contains(appAddress)) { +if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5691] Fixing wrong data structure lookup for dupe app registration
Repository: spark Updated Branches: refs/heads/branch-1.2 4bad85485 -> 97541b22e [SPARK-5691] Fixing wrong data structure lookup for dupe app registration In Master's registerApplication method, it checks if the application had already registered by examining the addressToWorker hash map. In reality, it should refer to the addressToApp data structure, as this is what really tracks which apps have been registered. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97541b22 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97541b22 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97541b22 Branch: refs/heads/branch-1.2 Commit: 97541b22ea4a98ee8001f3069c1c4673c9582d78 Parents: 4bad854 Author: mcheah Authored: Mon Feb 9 19:58:58 2015 +0100 Committer: Andrew Or Committed: Mon Feb 9 13:21:18 2015 -0800 -- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/97541b22/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 56169d2..5d20e84 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -640,7 +640,7 @@ private[spark] class Master( def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.path.address -if (addressToWorker.contains(appAddress)) { +if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5691] Fixing wrong data structure lookup for dupe app registratio...
Repository: spark Updated Branches: refs/heads/branch-1.3 43972b5d1 -> 6a0144c63 [SPARK-5691] Fixing wrong data structure lookup for dupe app registratio... In Master's registerApplication method, it checks if the application had already registered by examining the addressToWorker hash map. In reality, it should refer to the addressToApp data structure, as this is what really tracks which apps have been registered. Author: mcheah Closes #4477 from mccheah/spark-5691 and squashes the following commits: efdc573 [mcheah] [SPARK-5691] Fixing wrong data structure lookup for dupe app registration (cherry picked from commit 6fe70d8432314f0b7290a66f114306f61e0a87cc) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a0144c6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a0144c6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a0144c6 Branch: refs/heads/branch-1.3 Commit: 6a0144c63414bb3bbf181810dbe317d727ace201 Parents: 43972b5 Author: mcheah Authored: Mon Feb 9 13:20:14 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 13:20:19 2015 -0800 -- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a0144c6/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b8b1a25..53e4539 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -671,7 +671,7 @@ private[spark] class Master( def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.path.address -if (addressToWorker.contains(appAddress)) { +if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r1658576 - in /spark: ./ js/ site/ site/docs/ site/docs/1.2.1/ site/docs/1.2.1/api/ site/docs/1.2.1/api/java/ site/docs/1.2.1/api/java/org/ site/docs/1.2.1/api/java/org/apache/ site/docs/1
Author: pwendell Date: Mon Feb 9 21:16:41 2015 New Revision: 1658576 URL: http://svn.apache.org/r1658576 Log: Adding Spark 1.2.1 release [This commit notification would consist of 763 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5678] Convert DataFrame to pandas.DataFrame and Series
Repository: spark Updated Branches: refs/heads/branch-1.3 fa67877c2 -> 43972b5d1 [SPARK-5678] Convert DataFrame to pandas.DataFrame and Series ``` pyspark.sql.DataFrame.to_pandas = to_pandas(self) unbound pyspark.sql.DataFrame method Collect all the rows and return a `pandas.DataFrame`. >>> df.to_pandas() # doctest: +SKIP age name 02 Alice 15Bob pyspark.sql.Column.to_pandas = to_pandas(self) unbound pyspark.sql.Column method Return a pandas.Series from the column >>> df.age.to_pandas() # doctest: +SKIP 02 15 dtype: int64 ``` Not tests by jenkins (they depends on pandas) Author: Davies Liu Closes #4476 from davies/to_pandas and squashes the following commits: 6276fb6 [Davies Liu] Convert DataFrame to pandas.DataFrame and Series (cherry picked from commit afb131637d96e1e5e07eb8abf24e32e7f3b2304d) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43972b5d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43972b5d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43972b5d Branch: refs/heads/branch-1.3 Commit: 43972b5d19b0013ab36324129362a4a2c12f41b3 Parents: fa67877 Author: Davies Liu Authored: Mon Feb 9 11:42:52 2015 -0800 Committer: Reynold Xin Committed: Mon Feb 9 11:49:01 2015 -0800 -- python/pyspark/sql.py | 25 + 1 file changed, 25 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/43972b5d/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index e55f285..6a6dfbc 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -2284,6 +2284,18 @@ class DataFrame(object): """ return self.select('*', col.alias(colName)) +def to_pandas(self): +""" +Collect all the rows and return a `pandas.DataFrame`. + +>>> df.to_pandas() # doctest: +SKIP + age name +02 Alice +15Bob +""" +import pandas as pd +return pd.DataFrame.from_records(self.collect(), columns=self.columns) + # Having SchemaRDD for backward compatibility (for docs) class SchemaRDD(DataFrame): @@ -2551,6 +2563,19 @@ class Column(DataFrame): jc = self._jc.cast(jdt) return Column(jc, self.sql_ctx) +def to_pandas(self): +""" +Return a pandas.Series from the column + +>>> df.age.to_pandas() # doctest: +SKIP +02 +15 +dtype: int64 +""" +import pandas as pd +data = [c for c, in self.collect()] +return pd.Series(data) + def _aggregate_func(name, doc=""): """ Create a function for aggregator by name""" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5664][BUILD] Restore stty settings when exiting from SBT's spark-shell
Repository: spark Updated Branches: refs/heads/master afb131637 -> dae216147 [SPARK-5664][BUILD] Restore stty settings when exiting from SBT's spark-shell For launching spark-shell from SBT. Author: Liang-Chi Hsieh Closes #4451 from viirya/restore_stty and squashes the following commits: fdfc480 [Liang-Chi Hsieh] Restore stty settings when exit (for launching spark-shell from SBT). Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dae21614 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dae21614 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dae21614 Branch: refs/heads/master Commit: dae216147f2247fd722fb0909da74fe71cf2fa8b Parents: afb1316 Author: Liang-Chi Hsieh Authored: Mon Feb 9 11:45:12 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 11:45:12 2015 -0800 -- build/sbt | 28 build/sbt-launch-lib.bash | 2 +- 2 files changed, 29 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dae21614/build/sbt -- diff --git a/build/sbt b/build/sbt index 28ebb64..cc3203d 100755 --- a/build/sbt +++ b/build/sbt @@ -125,4 +125,32 @@ loadConfigFile() { [[ -f "$etc_sbt_opts_file" ]] && set -- $(loadConfigFile "$etc_sbt_opts_file") "$@" [[ -f "$sbt_opts_file" ]] && set -- $(loadConfigFile "$sbt_opts_file") "$@" +exit_status=127 +saved_stty="" + +restoreSttySettings() { + stty $saved_stty + saved_stty="" +} + +onExit() { + if [[ "$saved_stty" != "" ]]; then +restoreSttySettings + fi + exit $exit_status +} + +saveSttySettings() { + saved_stty=$(stty -g 2>/dev/null) + if [[ ! $? ]]; then +saved_stty="" + fi +} + +saveSttySettings +trap onExit INT + run "$@" + +exit_status=$? +onExit http://git-wip-us.apache.org/repos/asf/spark/blob/dae21614/build/sbt-launch-lib.bash -- diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash index 5e0c640..504be48 100755 --- a/build/sbt-launch-lib.bash +++ b/build/sbt-launch-lib.bash @@ -81,7 +81,7 @@ execRunner () { echo "" } - exec "$@" + "$@" } addJava () { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5664][BUILD] Restore stty settings when exiting from SBT's spark-shell
Repository: spark Updated Branches: refs/heads/branch-1.3 c88d4ab1d -> fa67877c2 [SPARK-5664][BUILD] Restore stty settings when exiting from SBT's spark-shell For launching spark-shell from SBT. Author: Liang-Chi Hsieh Closes #4451 from viirya/restore_stty and squashes the following commits: fdfc480 [Liang-Chi Hsieh] Restore stty settings when exit (for launching spark-shell from SBT). (cherry picked from commit dae216147f2247fd722fb0909da74fe71cf2fa8b) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa67877c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa67877c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa67877c Branch: refs/heads/branch-1.3 Commit: fa67877c2323122d4ee10ea9c4175ef0473cedcc Parents: c88d4ab Author: Liang-Chi Hsieh Authored: Mon Feb 9 11:45:12 2015 -0800 Committer: Michael Armbrust Committed: Mon Feb 9 11:45:21 2015 -0800 -- build/sbt | 28 build/sbt-launch-lib.bash | 2 +- 2 files changed, 29 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fa67877c/build/sbt -- diff --git a/build/sbt b/build/sbt index 28ebb64..cc3203d 100755 --- a/build/sbt +++ b/build/sbt @@ -125,4 +125,32 @@ loadConfigFile() { [[ -f "$etc_sbt_opts_file" ]] && set -- $(loadConfigFile "$etc_sbt_opts_file") "$@" [[ -f "$sbt_opts_file" ]] && set -- $(loadConfigFile "$sbt_opts_file") "$@" +exit_status=127 +saved_stty="" + +restoreSttySettings() { + stty $saved_stty + saved_stty="" +} + +onExit() { + if [[ "$saved_stty" != "" ]]; then +restoreSttySettings + fi + exit $exit_status +} + +saveSttySettings() { + saved_stty=$(stty -g 2>/dev/null) + if [[ ! $? ]]; then +saved_stty="" + fi +} + +saveSttySettings +trap onExit INT + run "$@" + +exit_status=$? +onExit http://git-wip-us.apache.org/repos/asf/spark/blob/fa67877c/build/sbt-launch-lib.bash -- diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash index 5e0c640..504be48 100755 --- a/build/sbt-launch-lib.bash +++ b/build/sbt-launch-lib.bash @@ -81,7 +81,7 @@ execRunner () { echo "" } - exec "$@" + "$@" } addJava () { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5678] Convert DataFrame to pandas.DataFrame and Series
Repository: spark Updated Branches: refs/heads/master de7806048 -> afb131637 [SPARK-5678] Convert DataFrame to pandas.DataFrame and Series ``` pyspark.sql.DataFrame.to_pandas = to_pandas(self) unbound pyspark.sql.DataFrame method Collect all the rows and return a `pandas.DataFrame`. >>> df.to_pandas() # doctest: +SKIP age name 02 Alice 15Bob pyspark.sql.Column.to_pandas = to_pandas(self) unbound pyspark.sql.Column method Return a pandas.Series from the column >>> df.age.to_pandas() # doctest: +SKIP 02 15 dtype: int64 ``` Not tests by jenkins (they depends on pandas) Author: Davies Liu Closes #4476 from davies/to_pandas and squashes the following commits: 6276fb6 [Davies Liu] Convert DataFrame to pandas.DataFrame and Series Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afb13163 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afb13163 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afb13163 Branch: refs/heads/master Commit: afb131637d96e1e5e07eb8abf24e32e7f3b2304d Parents: de78060 Author: Davies Liu Authored: Mon Feb 9 11:42:52 2015 -0800 Committer: Reynold Xin Committed: Mon Feb 9 11:42:52 2015 -0800 -- python/pyspark/sql.py | 25 + 1 file changed, 25 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/afb13163/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index e55f285..6a6dfbc 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -2284,6 +2284,18 @@ class DataFrame(object): """ return self.select('*', col.alias(colName)) +def to_pandas(self): +""" +Collect all the rows and return a `pandas.DataFrame`. + +>>> df.to_pandas() # doctest: +SKIP + age name +02 Alice +15Bob +""" +import pandas as pd +return pd.DataFrame.from_records(self.collect(), columns=self.columns) + # Having SchemaRDD for backward compatibility (for docs) class SchemaRDD(DataFrame): @@ -2551,6 +2563,19 @@ class Column(DataFrame): jc = self._jc.cast(jdt) return Column(jc, self.sql_ctx) +def to_pandas(self): +""" +Return a pandas.Series from the column + +>>> df.age.to_pandas() # doctest: +SKIP +02 +15 +dtype: int64 +""" +import pandas as pd +data = [c for c, in self.collect()] +return pd.Series(data) + def _aggregate_func(name, doc=""): """ Create a function for aggregator by name""" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-3242 [EC2] Spark 1.0.2 ec2 scripts creates clusters with Spark 1.0.1 installed by default
Repository: spark Updated Branches: refs/heads/branch-1.0 4b9234905 -> 444ccdd80 SPARK-3242 [EC2] Spark 1.0.2 ec2 scripts creates clusters with Spark 1.0.1 installed by default tdas you recorded this as a blocker to-do for branch 1.0. Seemed easy, so here's a PR? Author: Sean Owen Closes #4458 from srowen/SPARK-3242 and squashes the following commits: 58a5ede [Sean Owen] Update Spark version in ec2 script to 1.0.3 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/444ccdd8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/444ccdd8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/444ccdd8 Branch: refs/heads/branch-1.0 Commit: 444ccdd80ec5df249978d8498b4fc501cc3429d7 Parents: 4b92349 Author: Sean Owen Authored: Mon Feb 9 10:42:17 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 10:42:17 2015 -0800 -- ec2/spark_ec2.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/444ccdd8/ec2/spark_ec2.py -- diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 348277a..a27910c 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -70,7 +70,7 @@ def parse_args(): "slaves across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies)") parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use") - parser.add_option("-v", "--spark-version", default="1.0.1", + parser.add_option("-v", "--spark-version", default="1.0.3", help="Version of Spark to use: 'X.Y.Z' or a specific git hash") parser.add_option("--spark-git-repo", default="https://github.com/apache/spark";, @@ -164,7 +164,8 @@ def is_active(instance): # Return correct versions of Spark and Shark, given the supplied Spark version def get_spark_shark_version(opts): spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0", "0.8.1": "0.8.1", "0.9.0": "0.9.0", -"0.9.1": "0.9.1", "1.0.0": "1.0.0", "1.0.1": "1.0.0", "1.0.2": "1.0.0"} +"0.9.1": "0.9.1", "1.0.0": "1.0.0", "1.0.1": "1.0.0", "1.0.2": "1.0.0", +"1.0.3": "1.0.0"} version = opts.spark_version.replace("v", "") if version not in spark_shark_map: print >> stderr, "Don't know about Spark version: %s" % version - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-4267 [YARN] Failing to launch jobs on Spark on YARN with Hadoop 2.5.0 or later
Repository: spark Updated Branches: refs/heads/branch-1.3 f2aa7b757 -> c88d4ab1d SPARK-4267 [YARN] Failing to launch jobs on Spark on YARN with Hadoop 2.5.0 or later Before passing to YARN, escape arguments in "extraJavaOptions" args, in order to correctly handle cases like -Dfoo="one two three". Also standardize how these args are handled and ensure that individual args are treated as stand-alone args, not one string. vanzin andrewor14 Author: Sean Owen Closes #4452 from srowen/SPARK-4267.2 and squashes the following commits: c8297d2 [Sean Owen] Before passing to YARN, escape arguments in "extraJavaOptions" args, in order to correctly handle cases like -Dfoo="one two three". Also standardize how these args are handled and ensure that individual args are treated as stand-alone args, not one string. (cherry picked from commit de7806048ac49a8bfdf44d8f87bc11cea1dfb242) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c88d4ab1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c88d4ab1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c88d4ab1 Branch: refs/heads/branch-1.3 Commit: c88d4ab1ddf025335c8b0ebd99de90cdacf09fed Parents: f2aa7b7 Author: Sean Owen Authored: Mon Feb 9 10:33:57 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 10:34:06 2015 -0800 -- .../org/apache/spark/deploy/yarn/Client.scala | 9 + .../spark/deploy/yarn/ExecutorRunnable.scala | 17 + .../spark/deploy/yarn/YarnClusterSuite.scala | 6 -- 3 files changed, 18 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c88d4ab1/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e700509..8afc1cc 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -435,10 +435,11 @@ private[spark] class Client( // Include driver-specific java options if we are launching a driver if (isClusterMode) { - sparkConf.getOption("spark.driver.extraJavaOptions") + val driverOpts = sparkConf.getOption("spark.driver.extraJavaOptions") .orElse(sys.env.get("SPARK_JAVA_OPTS")) -.map(Utils.splitCommandString).getOrElse(Seq.empty) -.foreach(opts => javaOpts += opts) + driverOpts.foreach { opts => +javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) + } val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"), sys.props.get("spark.driver.libraryPath")).flatten if (libraryPaths.nonEmpty) { @@ -460,7 +461,7 @@ private[spark] class Client( val msg = s"$amOptsKey is not allowed to alter memory settings (was '$opts')." throw new SparkException(msg) } -javaOpts ++= Utils.splitCommandString(opts) +javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) } } http://git-wip-us.apache.org/repos/asf/spark/blob/c88d4ab1/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 408cf09..7cd8c5f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -128,14 +128,15 @@ class ExecutorRunnable( // Set the JVM memory val executorMemoryString = executorMemory + "m" -javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " " +javaOpts += "-Xms" + executorMemoryString +javaOpts += "-Xmx" + executorMemoryString // Set extra Java options for the executor, if defined sys.props.get("spark.executor.extraJavaOptions").foreach { opts => - javaOpts += opts + javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) } sys.env.get("SPARK_JAVA_OPTS").foreach { opts => - javaOpts += opts + javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) } sys.props.get("spark.executor.extraLibraryPath").foreach { p => prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p))) @@ -173,11 +174,11 @@ class ExecutorRunnable( // The options are based on // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.ht
spark git commit: SPARK-4267 [YARN] Failing to launch jobs on Spark on YARN with Hadoop 2.5.0 or later
Repository: spark Updated Branches: refs/heads/master 0793ee1b4 -> de7806048 SPARK-4267 [YARN] Failing to launch jobs on Spark on YARN with Hadoop 2.5.0 or later Before passing to YARN, escape arguments in "extraJavaOptions" args, in order to correctly handle cases like -Dfoo="one two three". Also standardize how these args are handled and ensure that individual args are treated as stand-alone args, not one string. vanzin andrewor14 Author: Sean Owen Closes #4452 from srowen/SPARK-4267.2 and squashes the following commits: c8297d2 [Sean Owen] Before passing to YARN, escape arguments in "extraJavaOptions" args, in order to correctly handle cases like -Dfoo="one two three". Also standardize how these args are handled and ensure that individual args are treated as stand-alone args, not one string. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de780604 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de780604 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de780604 Branch: refs/heads/master Commit: de7806048ac49a8bfdf44d8f87bc11cea1dfb242 Parents: 0793ee1 Author: Sean Owen Authored: Mon Feb 9 10:33:57 2015 -0800 Committer: Andrew Or Committed: Mon Feb 9 10:33:57 2015 -0800 -- .../org/apache/spark/deploy/yarn/Client.scala | 9 + .../spark/deploy/yarn/ExecutorRunnable.scala | 17 + .../spark/deploy/yarn/YarnClusterSuite.scala | 6 -- 3 files changed, 18 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/de780604/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e700509..8afc1cc 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -435,10 +435,11 @@ private[spark] class Client( // Include driver-specific java options if we are launching a driver if (isClusterMode) { - sparkConf.getOption("spark.driver.extraJavaOptions") + val driverOpts = sparkConf.getOption("spark.driver.extraJavaOptions") .orElse(sys.env.get("SPARK_JAVA_OPTS")) -.map(Utils.splitCommandString).getOrElse(Seq.empty) -.foreach(opts => javaOpts += opts) + driverOpts.foreach { opts => +javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) + } val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"), sys.props.get("spark.driver.libraryPath")).flatten if (libraryPaths.nonEmpty) { @@ -460,7 +461,7 @@ private[spark] class Client( val msg = s"$amOptsKey is not allowed to alter memory settings (was '$opts')." throw new SparkException(msg) } -javaOpts ++= Utils.splitCommandString(opts) +javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) } } http://git-wip-us.apache.org/repos/asf/spark/blob/de780604/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 408cf09..7cd8c5f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -128,14 +128,15 @@ class ExecutorRunnable( // Set the JVM memory val executorMemoryString = executorMemory + "m" -javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " " +javaOpts += "-Xms" + executorMemoryString +javaOpts += "-Xmx" + executorMemoryString // Set extra Java options for the executor, if defined sys.props.get("spark.executor.extraJavaOptions").foreach { opts => - javaOpts += opts + javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) } sys.env.get("SPARK_JAVA_OPTS").foreach { opts => - javaOpts += opts + javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) } sys.props.get("spark.executor.extraLibraryPath").foreach { p => prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p))) @@ -173,11 +174,11 @@ class ExecutorRunnable( // The options are based on // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use // %20the%20Concurrent%20Low%20Pause%20Collector|outline -
spark git commit: SPARK-2149. [MLLIB] Univariate kernel density estimation
Repository: spark Updated Branches: refs/heads/master 4dfe180fc -> 0793ee1b4 SPARK-2149. [MLLIB] Univariate kernel density estimation Author: Sandy Ryza Closes #1093 from sryza/sandy-spark-2149 and squashes the following commits: 5f06b33 [Sandy Ryza] More review comments 0f73060 [Sandy Ryza] Respond to Sean's review comments 0dfa005 [Sandy Ryza] SPARK-2149. Univariate kernel density estimation Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0793ee1b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0793ee1b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0793ee1b Branch: refs/heads/master Commit: 0793ee1b4dea1f4b0df749e8ad7c1ab70b512faf Parents: 4dfe180 Author: Sandy Ryza Authored: Mon Feb 9 10:12:12 2015 + Committer: Sean Owen Committed: Mon Feb 9 10:12:12 2015 + -- .../apache/spark/mllib/stat/KernelDensity.scala | 71 .../apache/spark/mllib/stat/Statistics.scala| 14 .../spark/mllib/stat/KernelDensitySuite.scala | 47 + 3 files changed, 132 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0793ee1b/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala new file mode 100644 index 000..0deef11 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/KernelDensity.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat + +import org.apache.spark.rdd.RDD + +private[stat] object KernelDensity { + /** + * Given a set of samples from a distribution, estimates its density at the set of given points. + * Uses a Gaussian kernel with the given standard deviation. + */ + def estimate(samples: RDD[Double], standardDeviation: Double, + evaluationPoints: Array[Double]): Array[Double] = { +if (standardDeviation <= 0.0) { + throw new IllegalArgumentException("Standard deviation must be positive") +} + +// This gets used in each Gaussian PDF computation, so compute it up front +val logStandardDeviationPlusHalfLog2Pi = + Math.log(standardDeviation) + 0.5 * Math.log(2 * Math.PI) + +val (points, count) = samples.aggregate((new Array[Double](evaluationPoints.length), 0))( + (x, y) => { +var i = 0 +while (i < evaluationPoints.length) { + x._1(i) += normPdf(y, standardDeviation, logStandardDeviationPlusHalfLog2Pi, +evaluationPoints(i)) + i += 1 +} +(x._1, i) + }, + (x, y) => { +var i = 0 +while (i < evaluationPoints.length) { + x._1(i) += y._1(i) + i += 1 +} +(x._1, x._2 + y._2) + }) + +var i = 0 +while (i < points.length) { + points(i) /= count + i += 1 +} +points + } + + private def normPdf(mean: Double, standardDeviation: Double, + logStandardDeviationPlusHalfLog2Pi: Double, x: Double): Double = { +val x0 = x - mean +val x1 = x0 / standardDeviation +val logDensity = -0.5 * x1 * x1 - logStandardDeviationPlusHalfLog2Pi +Math.exp(logDensity) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/0793ee1b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index b3fad0c..3256162 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -149,4 +149,18 @@ object Statistics { def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = { ChiSqTest.chiSquaredFeatures(data) } + + /** + * Given an empirical d
spark git commit: [SPARK-5473] [EC2] Expose SSH failures after status checks pass
Repository: spark Updated Branches: refs/heads/master 855d12ac0 -> 4dfe180fc [SPARK-5473] [EC2] Expose SSH failures after status checks pass If there is some fatal problem with launching a cluster, `spark-ec2` just hangs without giving the user useful feedback on what the problem is. This PR exposes the output of the SSH calls to the user if the SSH test fails during cluster launch for any reason but the instance status checks are all green. It also removes the growing trail of dots while waiting in favor of a fixed 3 dots. For example: ``` $ ./ec2/spark-ec2 -k key -i /incorrect/path/identity.pem --instance-type m3.medium --slaves 1 --zone us-east-1c launch "spark-test" Setting up security groups... Searching for existing cluster spark-test... Spark AMI: ami-35b1885c Launching instances... Launched 1 slaves in us-east-1c, regid = r-7dadd096 Launched master in us-east-1c, regid = r-fcadd017 Waiting for cluster to enter 'ssh-ready' state... Warning: SSH connection error. (This could be temporary.) Host: 127.0.0.1 SSH return code: 255 SSH output: Warning: Identity file /incorrect/path/identity.pem not accessible: No such file or directory. Warning: Permanently added '127.0.0.1' (RSA) to the list of known hosts. Permission denied (publickey). ``` This should give users enough information when some unrecoverable error occurs during launch so they can know to abort the launch. This will help avoid situations like the ones reported [here on Stack Overflow](http://stackoverflow.com/q/28002443/) and [here on the user list](http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3C1422323829398-21381.postn3.nabble.com%3E), where the users couldn't tell what the problem was because it was being hidden by `spark-ec2`. This is a usability improvement that should be backported to 1.2. Resolves [SPARK-5473](https://issues.apache.org/jira/browse/SPARK-5473). Author: Nicholas Chammas Closes #4262 from nchammas/expose-ssh-failure and squashes the following commits: 8bda6ed [Nicholas Chammas] default to print SSH output 2b92534 [Nicholas Chammas] show SSH output after status check pass Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dfe180f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dfe180f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dfe180f Branch: refs/heads/master Commit: 4dfe180fc893bee1146161f8b2a6efd4d6d2bb8c Parents: 855d12a Author: Nicholas Chammas Authored: Mon Feb 9 09:44:53 2015 + Committer: Sean Owen Committed: Mon Feb 9 09:44:53 2015 + -- ec2/spark_ec2.py | 36 1 file changed, 24 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4dfe180f/ec2/spark_ec2.py -- diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 725b1e4..87b2112 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -34,6 +34,7 @@ import subprocess import sys import tarfile import tempfile +import textwrap import time import urllib2 import warnings @@ -681,21 +682,32 @@ def setup_spark_cluster(master, opts): print "Ganglia started at http://%s:5080/ganglia"; % master -def is_ssh_available(host, opts): +def is_ssh_available(host, opts, print_ssh_output=True): """ Check if SSH is available on a host. """ -try: -with open(os.devnull, 'w') as devnull: -ret = subprocess.check_call( -ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3', - '%s@%s' % (opts.user, host), stringify_command('true')], -stdout=devnull, -stderr=devnull -) -return ret == 0 -except subprocess.CalledProcessError as e: -return False +s = subprocess.Popen( +ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3', + '%s@%s' % (opts.user, host), stringify_command('true')], +stdout=subprocess.PIPE, +stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order +) +cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout + +if s.returncode != 0 and print_ssh_output: +# extra leading newline is for spacing in wait_for_cluster_state() +print textwrap.dedent("""\n +Warning: SSH connection error. (This could be temporary.) +Host: {h} +SSH return code: {r} +SSH output: {o} +""").format( +h=host, +r=s.returncode, +o=cmd_output.strip() +) + +return s.returncode == 0 def is_cluster_ssh_available(cluster_instances, opts): -
spark git commit: [SPARK-5473] [EC2] Expose SSH failures after status checks pass
Repository: spark Updated Branches: refs/heads/branch-1.3 5782ee29e -> f2aa7b757 [SPARK-5473] [EC2] Expose SSH failures after status checks pass If there is some fatal problem with launching a cluster, `spark-ec2` just hangs without giving the user useful feedback on what the problem is. This PR exposes the output of the SSH calls to the user if the SSH test fails during cluster launch for any reason but the instance status checks are all green. It also removes the growing trail of dots while waiting in favor of a fixed 3 dots. For example: ``` $ ./ec2/spark-ec2 -k key -i /incorrect/path/identity.pem --instance-type m3.medium --slaves 1 --zone us-east-1c launch "spark-test" Setting up security groups... Searching for existing cluster spark-test... Spark AMI: ami-35b1885c Launching instances... Launched 1 slaves in us-east-1c, regid = r-7dadd096 Launched master in us-east-1c, regid = r-fcadd017 Waiting for cluster to enter 'ssh-ready' state... Warning: SSH connection error. (This could be temporary.) Host: 127.0.0.1 SSH return code: 255 SSH output: Warning: Identity file /incorrect/path/identity.pem not accessible: No such file or directory. Warning: Permanently added '127.0.0.1' (RSA) to the list of known hosts. Permission denied (publickey). ``` This should give users enough information when some unrecoverable error occurs during launch so they can know to abort the launch. This will help avoid situations like the ones reported [here on Stack Overflow](http://stackoverflow.com/q/28002443/) and [here on the user list](http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3C1422323829398-21381.postn3.nabble.com%3E), where the users couldn't tell what the problem was because it was being hidden by `spark-ec2`. This is a usability improvement that should be backported to 1.2. Resolves [SPARK-5473](https://issues.apache.org/jira/browse/SPARK-5473). Author: Nicholas Chammas Closes #4262 from nchammas/expose-ssh-failure and squashes the following commits: 8bda6ed [Nicholas Chammas] default to print SSH output 2b92534 [Nicholas Chammas] show SSH output after status check pass (cherry picked from commit 4dfe180fc893bee1146161f8b2a6efd4d6d2bb8c) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2aa7b75 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2aa7b75 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2aa7b75 Branch: refs/heads/branch-1.3 Commit: f2aa7b7572735019c56091e770c14ccba3d95833 Parents: 5782ee2 Author: Nicholas Chammas Authored: Mon Feb 9 09:44:53 2015 + Committer: Sean Owen Committed: Mon Feb 9 09:45:03 2015 + -- ec2/spark_ec2.py | 36 1 file changed, 24 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f2aa7b75/ec2/spark_ec2.py -- diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 3f7242a..ee45dd3 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -32,6 +32,7 @@ import subprocess import sys import tarfile import tempfile +import textwrap import time import urllib2 import warnings @@ -678,21 +679,32 @@ def setup_spark_cluster(master, opts): print "Ganglia started at http://%s:5080/ganglia"; % master -def is_ssh_available(host, opts): +def is_ssh_available(host, opts, print_ssh_output=True): """ Check if SSH is available on a host. """ -try: -with open(os.devnull, 'w') as devnull: -ret = subprocess.check_call( -ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3', - '%s@%s' % (opts.user, host), stringify_command('true')], -stdout=devnull, -stderr=devnull -) -return ret == 0 -except subprocess.CalledProcessError as e: -return False +s = subprocess.Popen( +ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3', + '%s@%s' % (opts.user, host), stringify_command('true')], +stdout=subprocess.PIPE, +stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order +) +cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout + +if s.returncode != 0 and print_ssh_output: +# extra leading newline is for spacing in wait_for_cluster_state() +print textwrap.dedent("""\n +Warning: SSH connection error. (This could be temporary.) +Host: {h} +SSH return code: {r} +SSH output: {o} +""").format( +h=host, +r=s.returncode, +o=cmd_output.strip() +) + +return s.returncode