git commit: SPARK-2425 Don't kill a still-running Application because of some misbehaving Executors
Repository: spark Updated Branches: refs/heads/master 2b7ab814f -> 092e2f152 SPARK-2425 Don't kill a still-running Application because of some misbehaving Executors Introduces a LOADING -> RUNNING ApplicationState transition and prevents Master from removing an Application with RUNNING Executors. Two basic changes: 1) Instead of allowing MAX_NUM_RETRY abnormal Executor exits over the entire lifetime of the Application, allow that many since any Executor successfully began running the Application; 2) Don't remove the Application while Master still thinks that there are RUNNING Executors. This should be fine as long as the ApplicationInfo doesn't believe any Executors are forever RUNNING when they are not. I think that any non-RUNNING Executors will eventually no longer be RUNNING in Master's accounting, but another set of eyes should confirm that. This PR also doesn't try to detect which nodes have gone rogue or to kill off bad Workers, so repeatedly failing Executors will continue to fail and fill up log files with failure reports as long as the Application keeps running. Author: Mark Hamstra Closes #1360 from markhamstra/SPARK-2425 and squashes the following commits: f099c0b [Mark Hamstra] Reuse appInfo b2b7b25 [Mark Hamstra] Moved 'Application failed' logging bdd0928 [Mark Hamstra] switched to string interpolation 1dd591b [Mark Hamstra] SPARK-2425 introduce LOADING -> RUNNING ApplicationState transition and prevent Master from removing Application with RUNNING Executors Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/092e2f15 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/092e2f15 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/092e2f15 Branch: refs/heads/master Commit: 092e2f152fb674e7200cc8a2cb99a8fe0a9b2b33 Parents: 2b7ab81 Author: Mark Hamstra Authored: Mon Sep 8 20:51:56 2014 -0700 Committer: Andrew Or Committed: Mon Sep 8 20:51:56 2014 -0700 -- .../spark/deploy/master/ApplicationInfo.scala | 4 ++- .../org/apache/spark/deploy/master/Master.scala | 26 .../spark/deploy/worker/ExecutorRunner.scala| 2 ++ .../org/apache/spark/deploy/worker/Worker.scala | 2 +- 4 files changed, 22 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/092e2f15/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index d367442..c3ca43f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -96,11 +96,13 @@ private[spark] class ApplicationInfo( def retryCount = _retryCount - def incrementRetryCount = { + def incrementRetryCount() = { _retryCount += 1 _retryCount } + def resetRetryCount() = _retryCount = 0 + def markFinished(endState: ApplicationState.Value) { state = endState endTime = System.currentTimeMillis() http://git-wip-us.apache.org/repos/asf/spark/blob/092e2f15/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 2a66fcf..a3909d6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -296,28 +296,34 @@ private[spark] class Master( val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => { + val appInfo = idToApp(appId) exec.state = state + if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() } exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { -val appInfo = idToApp(appId) // Remove this executor from the worker and app -logInfo("Removing executor " + exec.fullId + " because it is " + state) +logInfo(s"Removing executor ${exec.fullId} because it is $state") appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) -val normalExit = exitStatus.exists(_ == 0) +val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don't go into an infinite loop. -if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM
git commit: [SPARK-3329][SQL] Don't depend on Hive SET pair ordering in tests.
Repository: spark Updated Branches: refs/heads/master dc1dbf206 -> 2b7ab814f [SPARK-3329][SQL] Don't depend on Hive SET pair ordering in tests. This fixes some possible spurious test failures in `HiveQuerySuite` by comparing sets of key-value pairs as sets, rather than as lists. Author: William Benton Author: Aaron Davidson Closes #2220 from willb/spark-3329 and squashes the following commits: 3b3e205 [William Benton] Collapse collectResults case match in HiveQuerySuite 6525d8e [William Benton] Handle cases where SET returns Rows of (single) strings cf11b0e [Aaron Davidson] Fix flakey HiveQuerySuite test Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b7ab814 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b7ab814 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b7ab814 Branch: refs/heads/master Commit: 2b7ab814f9bde65ebc57ebd04386e56c97f06f4a Parents: dc1dbf2 Author: William Benton Authored: Mon Sep 8 19:29:18 2014 -0700 Committer: Michael Armbrust Committed: Mon Sep 8 19:29:23 2014 -0700 -- .../sql/hive/execution/HiveQuerySuite.scala | 47 +++- 1 file changed, 26 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2b7ab814/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 305998c..6bf8d18 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -558,62 +558,67 @@ class HiveQuerySuite extends HiveComparisonTest { val testKey = "spark.sql.key.usedfortestonly" val testVal = "test.val.0" val nonexistentKey = "nonexistent" - +val KV = "([^=]+)=([^=]*)".r +def collectResults(rdd: SchemaRDD): Set[(String, String)] = + rdd.collect().map { +case Row(key: String, value: String) => key -> value +case Row(KV(key, value)) => key -> value + }.toSet clear() // "set" itself returns all config variables currently specified in SQLConf. // TODO: Should we be listing the default here always? probably... assert(sql("SET").collect().size == 0) -assertResult(Array(s"$testKey=$testVal")) { - sql(s"SET $testKey=$testVal").collect().map(_.getString(0)) +assertResult(Set(testKey -> testVal)) { + collectResults(hql(s"SET $testKey=$testVal")) } assert(hiveconf.get(testKey, "") == testVal) -assertResult(Array(s"$testKey=$testVal")) { - sql(s"SET $testKey=$testVal").collect().map(_.getString(0)) +assertResult(Set(testKey -> testVal)) { + collectResults(hql("SET")) } sql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) -assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { - sql(s"SET").collect().map(_.getString(0)) +assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { + collectResults(hql("SET")) } // "set key" -assertResult(Array(s"$testKey=$testVal")) { - sql(s"SET $testKey").collect().map(_.getString(0)) +assertResult(Set(testKey -> testVal)) { + collectResults(hql(s"SET $testKey")) } -assertResult(Array(s"$nonexistentKey=")) { - sql(s"SET $nonexistentKey").collect().map(_.getString(0)) +assertResult(Set(nonexistentKey -> "")) { + collectResults(hql(s"SET $nonexistentKey")) } // Assert that sql() should have the same effects as sql() by repeating the above using sql(). clear() assert(sql("SET").collect().size == 0) -assertResult(Array(s"$testKey=$testVal")) { - sql(s"SET $testKey=$testVal").collect().map(_.getString(0)) +assertResult(Set(testKey -> testVal)) { + collectResults(sql(s"SET $testKey=$testVal")) } assert(hiveconf.get(testKey, "") == testVal) -assertResult(Array(s"$testKey=$testVal")) { - sql("SET").collect().map(_.getString(0)) +assertResult(Set(testKey -> testVal)) { + collectResults(sql("SET")) } sql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) -assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { - sql("SET").collect().map(_.getString(0)) +assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { + collectResults(sql("SET")) } -
git commit: [SPARK-3414][SQL] Stores analyzed logical plan when registering a temp table
Repository: spark Updated Branches: refs/heads/master ca0348e68 -> dc1dbf206 [SPARK-3414][SQL] Stores analyzed logical plan when registering a temp table Case insensitivity breaks when unresolved relation contains attributes with uppercase letters in their names, because we store unanalyzed logical plan when registering temp tables while the `CaseInsensitivityAttributeReferences` batch runs before the `Resolution` batch. To fix this issue, we need to store analyzed logical plan. Author: Cheng Lian Closes #2293 from liancheng/spark-3414 and squashes the following commits: d9fa1d6 [Cheng Lian] Stores analyzed logical plan when registering a temp table Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc1dbf20 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc1dbf20 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc1dbf20 Branch: refs/heads/master Commit: dc1dbf206e0076a43ad2120d8bb5b1fc6912fe25 Parents: ca0348e Author: Cheng Lian Authored: Mon Sep 8 19:08:05 2014 -0700 Committer: Michael Armbrust Committed: Mon Sep 8 19:08:05 2014 -0700 -- .../scala/org/apache/spark/sql/SQLContext.scala | 4 ++-- .../sql/hive/execution/HiveQuerySuite.scala | 25 +--- 2 files changed, 24 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dc1dbf20/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 5acb45c..a2f334a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -246,7 +246,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = { -catalog.registerTable(None, tableName, rdd.logicalPlan) +catalog.registerTable(None, tableName, rdd.queryExecution.analyzed) } /** @@ -411,7 +411,7 @@ class SQLContext(@transient val sparkContext: SparkContext) protected def stringOrError[A](f: => A): String = try f.toString catch { case e: Throwable => e.toString } -def simpleString: String = +def simpleString: String = s"""== Physical Plan == |${stringOrError(executedPlan)} """ http://git-wip-us.apache.org/repos/asf/spark/blob/dc1dbf20/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index f4217a5..305998c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -17,11 +17,8 @@ package org.apache.spark.sql.hive.execution -import java.io.File - import scala.util.Try -import org.apache.spark.SparkException import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ @@ -514,6 +511,28 @@ class HiveQuerySuite extends HiveComparisonTest { sql("DROP TABLE alter1") } + case class LogEntry(filename: String, message: String) + case class LogFile(name: String) + + test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") { +sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs") +sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles") + +sql( + """ + SELECT name, message + FROM rawLogs + JOIN ( +SELECT name +FROM logFiles + ) files + ON rawLogs.filename = files.name + """).registerTempTable("boom") + +// This should be successfully analyzed +sql("SELECT * FROM boom").queryExecution.analyzed + } + test("parse HQL set commands") { // Adapted from its SQL counterpart. val testKey = "spark.sql.key.usedfortestonly" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: SPARK-3423: [SQL] Implement BETWEEN for SQLParser
Repository: spark Updated Branches: refs/heads/master 50a4fa774 -> ca0348e68 SPARK-3423: [SQL] Implement BETWEEN for SQLParser This patch improves the SQLParser by adding support for BETWEEN conditions Author: William Benton Closes #2295 from willb/sql-between and squashes the following commits: 0016d30 [William Benton] Implement BETWEEN for SQLParser Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca0348e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca0348e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca0348e6 Branch: refs/heads/master Commit: ca0348e68213c2c7589f2018ebf9d889c0ce59c3 Parents: 50a4fa7 Author: William Benton Authored: Mon Sep 8 19:05:02 2014 -0700 Committer: Michael Armbrust Committed: Mon Sep 8 19:05:02 2014 -0700 -- .../org/apache/spark/sql/catalyst/SqlParser.scala | 4 .../org/apache/spark/sql/SQLQuerySuite.scala | 18 ++ 2 files changed, 22 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ca0348e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index a88bd85..bfc197c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -73,6 +73,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val ASC = Keyword("ASC") protected val APPROXIMATE = Keyword("APPROXIMATE") protected val AVG = Keyword("AVG") + protected val BETWEEN = Keyword("BETWEEN") protected val BY = Keyword("BY") protected val CACHE = Keyword("CACHE") protected val CAST = Keyword("CAST") @@ -272,6 +273,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers { termExpression ~ ">=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => GreaterThanOrEqual(e1, e2) } | termExpression ~ "!=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(EqualTo(e1, e2)) } | termExpression ~ "<>" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(EqualTo(e1, e2)) } | +termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ { + case e ~ _ ~ el ~ _ ~ eu => And(GreaterThanOrEqual(e, el), LessThanOrEqual(e, eu)) +} | termExpression ~ RLIKE ~ termExpression ^^ { case e1 ~ _ ~ e2 => RLike(e1, e2) } | termExpression ~ REGEXP ~ termExpression ^^ { case e1 ~ _ ~ e2 => RLike(e1, e2) } | termExpression ~ LIKE ~ termExpression ^^ { case e1 ~ _ ~ e2 => Like(e1, e2) } | http://git-wip-us.apache.org/repos/asf/spark/blob/ca0348e6/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e8fbc28..45c0ca8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -597,4 +597,22 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { (3, null) :: (4, 2147483644) :: Nil) } + + test("SPARK-3423 BETWEEN") { +checkAnswer( + sql("SELECT key, value FROM testData WHERE key BETWEEN 5 and 7"), + Seq((5, "5"), (6, "6"), (7, "7")) +) + +checkAnswer( + sql("SELECT key, value FROM testData WHERE key BETWEEN 7 and 7"), + Seq((7, "7")) +) + +checkAnswer( + sql("SELECT key, value FROM testData WHERE key BETWEEN 9 and 7"), + Seq() +) + + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-3443][MLLIB] update default values of tree:
Repository: spark Updated Branches: refs/heads/master 7db53391f -> 50a4fa774 [SPARK-3443][MLLIB] update default values of tree: Adjust the default values of decision tree, based on the memory requirement discussed in https://github.com/apache/spark/pull/2125 : 1. maxMemoryInMB: 128 -> 256 2. maxBins: 100 -> 32 3. maxDepth: 4 -> 5 (in some example code) jkbradley Author: Xiangrui Meng Closes #2322 from mengxr/tree-defaults and squashes the following commits: cda453a [Xiangrui Meng] fix tests 5900445 [Xiangrui Meng] update comments 8c81831 [Xiangrui Meng] update default values of tree: Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50a4fa77 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50a4fa77 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50a4fa77 Branch: refs/heads/master Commit: 50a4fa774a0e8a17d7743b33ce8941bf4041144d Parents: 7db5339 Author: Xiangrui Meng Authored: Mon Sep 8 18:59:57 2014 -0700 Committer: Xiangrui Meng Committed: Mon Sep 8 18:59:57 2014 -0700 -- docs/mllib-decision-tree.md | 16 .../spark/examples/mllib/JavaDecisionTree.java| 2 +- .../spark/examples/mllib/DecisionTreeRunner.scala | 4 ++-- .../apache/spark/mllib/tree/DecisionTree.scala| 8 .../spark/mllib/tree/configuration/Strategy.scala | 6 +++--- .../spark/mllib/tree/DecisionTreeSuite.scala | 18 -- python/pyspark/mllib/tree.py | 4 ++-- 7 files changed, 24 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/50a4fa77/docs/mllib-decision-tree.md -- diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index 1166d9c..12a6afb 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -80,7 +80,7 @@ The ordered splits create "bins" and the maximum number of such bins can be specified using the `maxBins` parameter. Note that the number of bins cannot be greater than the number of instances `$N$` (a rare scenario -since the default `maxBins` value is 100). The tree algorithm automatically reduces the number of +since the default `maxBins` value is 32). The tree algorithm automatically reduces the number of bins if the condition is not satisfied. **Categorical features** @@ -117,7 +117,7 @@ all nodes at each level of the tree. This could lead to high memory requirements of the tree, potentially leading to memory overflow errors. To alleviate this problem, a `maxMemoryInMB` training parameter specifies the maximum amount of memory at the workers (twice as much at the master) to be allocated to the histogram computation. The default value is conservatively chosen to -be 128 MB to allow the decision algorithm to work in most scenarios. Once the memory requirements +be 256 MB to allow the decision algorithm to work in most scenarios. Once the memory requirements for a level-wise computation cross the `maxMemoryInMB` threshold, the node training tasks at each subsequent level are split into smaller tasks. @@ -167,7 +167,7 @@ val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val impurity = "gini" val maxDepth = 5 -val maxBins = 100 +val maxBins = 32 val model = DecisionTree.trainClassifier(data, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins) @@ -213,7 +213,7 @@ Integer numClasses = 2; HashMap categoricalFeaturesInfo = new HashMap(); String impurity = "gini"; Integer maxDepth = 5; -Integer maxBins = 100; +Integer maxBins = 32; // Train a DecisionTree model for classification. final DecisionTreeModel model = DecisionTree.trainClassifier(data, numClasses, @@ -250,7 +250,7 @@ data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt').cache() # Train a DecisionTree model. # Empty categoricalFeaturesInfo indicates all features are continuous. model = DecisionTree.trainClassifier(data, numClasses=2, categoricalFeaturesInfo={}, - impurity='gini', maxDepth=5, maxBins=100) + impurity='gini', maxDepth=5, maxBins=32) # Evaluate model on training instances and compute training error predictions = model.predict(data.map(lambda x: x.features)) @@ -293,7 +293,7 @@ val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").cache val categoricalFeaturesInfo = Map[Int, Int]() val impurity = "variance" val maxDepth = 5 -val maxBins = 100 +val maxBins = 32 val model = DecisionTree.trainRegressor(data, categoricalFeaturesInfo, impurity, maxDepth, maxBins) @@ -338,7 +338,7 @@ JavaSparkContext sc = new JavaSparkContext(sparkConf); HashMap categoricalFeaturesInfo = new HashMap(); String impurit
git commit: [SPARK-3349][SQL] Output partitioning of limit should not be inherited from child
Repository: spark Updated Branches: refs/heads/master 08ce18881 -> 7db53391f [SPARK-3349][SQL] Output partitioning of limit should not be inherited from child This resolves https://issues.apache.org/jira/browse/SPARK-3349 Author: Eric Liang Closes #2262 from ericl/spark-3349 and squashes the following commits: 3e1b05c [Eric Liang] add regression test ac32723 [Eric Liang] make limit/takeOrdered output SinglePartition Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7db53391 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7db53391 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7db53391 Branch: refs/heads/master Commit: 7db53391f1b349d1f49844197b34f94806f5e336 Parents: 08ce188 Author: Eric Liang Authored: Mon Sep 8 16:14:32 2014 -0700 Committer: Michael Armbrust Committed: Mon Sep 8 16:14:36 2014 -0700 -- .../spark/sql/execution/basicOperators.scala | 4 +++- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 17 + 2 files changed, 20 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7db53391/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 47bff0c..cac3766 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -27,7 +27,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution} import org.apache.spark.util.MutablePair /** @@ -100,6 +100,7 @@ case class Limit(limit: Int, child: SparkPlan) private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] override def output = child.output + override def outputPartitioning = SinglePartition /** * A custom implementation modeled after the take function on RDDs but which never runs any job @@ -173,6 +174,7 @@ case class Limit(limit: Int, child: SparkPlan) case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode { override def output = child.output + override def outputPartitioning = SinglePartition val ordering = new RowOrdering(sortOrder, child.output) http://git-wip-us.apache.org/repos/asf/spark/blob/7db53391/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1ac2059..e8fbc28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -359,6 +359,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { (null, null, 6, "F") :: Nil) } + test("SPARK-3349 partitioning after limit") { +sql("SELECT DISTINCT n FROM lowerCaseData ORDER BY n DESC") + .limit(2) + .registerTempTable("subset1") +sql("SELECT DISTINCT n FROM lowerCaseData") + .limit(2) + .registerTempTable("subset2") +checkAnswer( + sql("SELECT * FROM lowerCaseData INNER JOIN subset1 ON subset1.n = lowerCaseData.n"), + (3, "c", 3) :: + (4, "d", 4) :: Nil) +checkAnswer( + sql("SELECT * FROM lowerCaseData INNER JOIN subset2 ON subset2.n = lowerCaseData.n"), + (1, "a", 1) :: + (2, "b", 2) :: Nil) + } + test("mixed-case keywords") { checkAnswer( sql( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[5/5] git commit: [SPARK-3019] Pluggable block transfer interface (BlockTransferService)
[SPARK-3019] Pluggable block transfer interface (BlockTransferService) This pull request creates a new BlockTransferService interface for block fetch/upload and refactors the existing ConnectionManager to implement BlockTransferService (NioBlockTransferService). Most of the changes are simply moving code around. The main class to inspect is ShuffleBlockFetcherIterator. Review guide: - Most of the ConnectionManager code is now in network.cm package - ManagedBuffer is a new buffer abstraction backed by several different implementations (file segment, nio ByteBuffer, Netty ByteBuf) - BlockTransferService is the main internal interface introduced in this PR - NioBlockTransferService implements BlockTransferService and replaces the old BlockManagerWorker - ShuffleBlockFetcherIterator replaces the told BlockFetcherIterator to use the new interface TODOs that should be separate PRs: - Implement NettyBlockTransferService - Finalize the API/semantics for ManagedBuffer.release() Author: Reynold Xin Closes #2240 from rxin/blockTransferService and squashes the following commits: 64cd9d7 [Reynold Xin] Merge branch 'master' into blockTransferService 1dfd3d7 [Reynold Xin] Limit the length of the FileInputStream. 1332156 [Reynold Xin] Fixed style violation from refactoring. 2960c93 [Reynold Xin] Added ShuffleBlockFetcherIteratorSuite. e29c721 [Reynold Xin] Updated comment for ShuffleBlockFetcherIterator. 8a1046e [Reynold Xin] Code review feedback: 2c6b1e1 [Reynold Xin] Removed println in test cases. 2a907e4 [Reynold Xin] Merge branch 'master' into blockTransferService-merge 07ccf0d [Reynold Xin] Added init check to CMBlockTransferService. 98c668a [Reynold Xin] Added failure handling and fixed unit tests. ae05fcd [Reynold Xin] Updated tests, although DistributedSuite is hanging. d8d595c [Reynold Xin] Merge branch 'master' of github.com:apache/spark into blockTransferService 9ef279c [Reynold Xin] Initial refactoring to move ConnectionManager to use the BlockTransferService. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08ce1888 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08ce1888 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08ce1888 Branch: refs/heads/master Commit: 08ce18881e09c6e91db9c410d1d9ce1e5ae63a62 Parents: 939a322 Author: Reynold Xin Authored: Mon Sep 8 15:59:20 2014 -0700 Committer: Reynold Xin Committed: Mon Sep 8 15:59:20 2014 -0700 -- .../main/scala/org/apache/spark/SparkEnv.scala | 15 +- .../apache/spark/network/BlockDataManager.scala | 36 + .../spark/network/BlockFetchingListener.scala | 37 + .../spark/network/BlockTransferService.scala| 131 +++ .../apache/spark/network/BufferMessage.scala| 113 -- .../org/apache/spark/network/Connection.scala | 587 -- .../org/apache/spark/network/ConnectionId.scala | 34 - .../spark/network/ConnectionManager.scala | 1047 -- .../spark/network/ConnectionManagerId.scala | 37 - .../spark/network/ConnectionManagerTest.scala | 103 -- .../apache/spark/network/ManagedBuffer.scala| 107 ++ .../org/apache/spark/network/Message.scala | 95 -- .../org/apache/spark/network/MessageChunk.scala | 41 - .../spark/network/MessageChunkHeader.scala | 82 -- .../org/apache/spark/network/ReceiverTest.scala | 37 - .../apache/spark/network/SecurityMessage.scala | 162 --- .../org/apache/spark/network/SenderTest.scala | 76 -- .../apache/spark/network/nio/BlockMessage.scala | 197 .../spark/network/nio/BlockMessageArray.scala | 160 +++ .../spark/network/nio/BufferMessage.scala | 114 ++ .../apache/spark/network/nio/Connection.scala | 587 ++ .../apache/spark/network/nio/ConnectionId.scala | 34 + .../spark/network/nio/ConnectionManager.scala | 1042 + .../spark/network/nio/ConnectionManagerId.scala | 37 + .../org/apache/spark/network/nio/Message.scala | 96 ++ .../apache/spark/network/nio/MessageChunk.scala | 41 + .../spark/network/nio/MessageChunkHeader.scala | 81 ++ .../network/nio/NioBlockTransferService.scala | 205 .../spark/network/nio/SecurityMessage.scala | 160 +++ .../spark/serializer/KryoSerializer.scala |2 +- .../spark/shuffle/FileShuffleBlockManager.scala | 35 +- .../shuffle/IndexShuffleBlockManager.scala | 24 +- .../spark/shuffle/ShuffleBlockManager.scala |6 +- .../shuffle/hash/BlockStoreShuffleFetcher.scala | 14 +- .../spark/shuffle/hash/HashShuffleReader.scala |4 +- .../spark/storage/BlockFetcherIterator.scala| 254 - .../org/apache/spark/storage/BlockManager.scala | 98 +- .../apache/spark/storage/BlockManagerId.scala |4 +- .../spark/storage/BlockManagerWorker.scala | 147 --- .../org/apache/spark/storage/BlockMessage.scala | 209 .../spark/storage/BlockMessa
[4/5] [SPARK-3019] Pluggable block transfer interface (BlockTransferService)
http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala -- diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala deleted file mode 100644 index 4894ecd..000 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network - -import java.nio.ByteBuffer - -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.io.Source - -import org.apache.spark._ - -private[spark] object ConnectionManagerTest extends Logging{ - def main(args: Array[String]) { -// - the master URL - a list slaves to run connectionTest on -// [num of tasks] - the number of parallel tasks to be initiated default is number of slave -// hosts [size of msg in MB (integer)] - the size of messages to be sent in each task, -// default is 10 [count] - how many times to run, default is 3 [await time in seconds] : -// await time (in seconds), default is 600 -if (args.length < 2) { - println("Usage: ConnectionManagerTest [num of tasks] " + -"[size of msg in MB (integer)] [count] [await time in seconds)] ") - System.exit(1) -} - -if (args(0).startsWith("local")) { - println("This runs only on a mesos cluster") -} - -val sc = new SparkContext(args(0), "ConnectionManagerTest") -val slavesFile = Source.fromFile(args(1)) -val slaves = slavesFile.mkString.split("\n") -slavesFile.close() - -/* println("Slaves") */ -/* slaves.foreach(println) */ -val tasknum = if (args.length > 2) args(2).toInt else slaves.length -val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024 -val count = if (args.length > 4) args(4).toInt else 3 -val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second -println("Running " + count + " rounds of test: " + "parallel tasks = " + tasknum + ", " + - "msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime) -val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map( -i => SparkEnv.get.connectionManager.id).collect() -println("\nSlave ConnectionManagerIds") -slaveConnManagerIds.foreach(println) -println - -(0 until count).foreach(i => { - val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => { -val connManager = SparkEnv.get.connectionManager -val thisConnManagerId = connManager.id -connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { - logInfo("Received [" + msg + "] from [" + id + "]") - None -}) - -val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte)) -buffer.flip - -val startTime = System.currentTimeMillis -val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map{ slaveConnManagerId => - { -val bufferMessage = Message.createBufferMessage(buffer.duplicate) -logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]") -connManager.sendMessageReliably(slaveConnManagerId, bufferMessage) - } -} -val results = futures.map(f => Await.result(f, awaitTime)) -val finishTime = System.currentTimeMillis -Thread.sleep(5000) - -val mb = size * results.size / 1024.0 / 1024.0 -val ms = finishTime - startTime -val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * - 1000.0) + " MB/s" -logInfo(resultStr) -resultStr - }).collect() - - println("-") - println("Run " + i) - resultStrs.foreach(println) - println("-") -}) - } -} - http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala -
[3/5] [SPARK-3019] Pluggable block transfer interface (BlockTransferService)
http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala new file mode 100644 index 000..09d3ea3 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -0,0 +1,1042 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.nio + +import java.io.IOException +import java.net._ +import java.nio._ +import java.nio.channels._ +import java.nio.channels.spi._ +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{LinkedBlockingDeque, ThreadPoolExecutor, TimeUnit} +import java.util.{Timer, TimerTask} + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, SynchronizedMap, SynchronizedQueue} +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future, Promise} +import scala.language.postfixOps + +import org.apache.spark._ +import org.apache.spark.util.{SystemClock, Utils} + + +private[nio] class ConnectionManager( +port: Int, +conf: SparkConf, +securityManager: SecurityManager, +name: String = "Connection manager") + extends Logging { + + /** + * Used by sendMessageReliably to track messages being sent. + * @param message the message that was sent + * @param connectionManagerId the connection manager that sent this message + * @param completionHandler callback that's invoked when the send has completed or failed + */ + class MessageStatus( + val message: Message, + val connectionManagerId: ConnectionManagerId, + completionHandler: MessageStatus => Unit) { + +/** This is non-None if message has been ack'd */ +var ackMessage: Option[Message] = None + +def markDone(ackMessage: Option[Message]) { + this.ackMessage = ackMessage + completionHandler(this) +} + } + + private val selector = SelectorProvider.provider.openSelector() + private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true) + + // default to 30 second timeout waiting for authentication + private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30) + private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60) + + private val handleMessageExecutor = new ThreadPoolExecutor( +conf.getInt("spark.core.connection.handler.threads.min", 20), +conf.getInt("spark.core.connection.handler.threads.max", 60), +conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS, +new LinkedBlockingDeque[Runnable](), +Utils.namedThreadFactory("handle-message-executor")) + + private val handleReadWriteExecutor = new ThreadPoolExecutor( +conf.getInt("spark.core.connection.io.threads.min", 4), +conf.getInt("spark.core.connection.io.threads.max", 32), +conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS, +new LinkedBlockingDeque[Runnable](), +Utils.namedThreadFactory("handle-read-write-executor")) + + // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : + // which should be executed asap + private val handleConnectExecutor = new ThreadPoolExecutor( +conf.getInt("spark.core.connection.connect.threads.min", 1), +conf.getInt("spark.core.connection.connect.threads.max", 8), +conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS, +new LinkedBlockingDeque[Runnable](), +Utils.namedThreadFactory("handle-connect-executor")) + + private val serverChannel = ServerSocketChannel.open() + // used to track the SendingConnections waiting to do SASL negotiation + private val connectionsAwaitingSasl = new HashMap[ConnectionId, SendingConnection] +with SynchronizedMap[ConnectionId, SendingConnection] + private val connectionsByKey = +new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection] + private val connectionsById = new HashMap[Conne
[2/5] [SPARK-3019] Pluggable block transfer interface (BlockTransferService)
http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a714142..d1bee3d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -20,6 +20,8 @@ package org.apache.spark.storage import java.io.{File, InputStream, OutputStream, BufferedOutputStream, ByteArrayOutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} +import scala.concurrent.ExecutionContext.Implicits.global + import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, Future} import scala.concurrent.duration._ @@ -58,18 +60,14 @@ private[spark] class BlockManager( defaultSerializer: Serializer, maxMemory: Long, val conf: SparkConf, -securityManager: SecurityManager, mapOutputTracker: MapOutputTracker, -shuffleManager: ShuffleManager) - extends BlockDataProvider with Logging { +shuffleManager: ShuffleManager, +blockTransferService: BlockTransferService) + extends BlockDataManager with Logging { - private val port = conf.getInt("spark.blockManager.port", 0) + blockTransferService.init(this) val diskBlockManager = new DiskBlockManager(this, conf) - val connectionManager = -new ConnectionManager(port, conf, securityManager, "Connection manager for block manager") - - implicit val futureExecContext = connectionManager.futureExecContext private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] @@ -89,11 +87,7 @@ private[spark] class BlockManager( } val blockManagerId = BlockManagerId( -executorId, connectionManager.id.host, connectionManager.id.port) - - // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory - // for receiving shuffle outputs) - val maxBytesInFlight = conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024 +executorId, blockTransferService.hostName, blockTransferService.port) // Whether to compress broadcast variables that are stored private val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true) @@ -136,11 +130,11 @@ private[spark] class BlockManager( master: BlockManagerMaster, serializer: Serializer, conf: SparkConf, - securityManager: SecurityManager, mapOutputTracker: MapOutputTracker, - shuffleManager: ShuffleManager) = { + shuffleManager: ShuffleManager, + blockTransferService: BlockTransferService) = { this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), - conf, securityManager, mapOutputTracker, shuffleManager) + conf, mapOutputTracker, shuffleManager, blockTransferService) } /** @@ -149,7 +143,6 @@ private[spark] class BlockManager( */ private def initialize(): Unit = { master.registerBlockManager(blockManagerId, maxMemory, slaveActor) -BlockManagerWorker.startBlockManagerWorker(this) } /** @@ -212,21 +205,34 @@ private[spark] class BlockManager( } } - override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] = { + /** + * Interface to get local block data. + * + * @return Some(buffer) if the block exists locally, and None if it doesn't. + */ + override def getBlockData(blockId: String): Option[ManagedBuffer] = { val bid = BlockId(blockId) if (bid.isShuffle) { - shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId]) + Some(shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId])) } else { val blockBytesOpt = doGetLocal(bid, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] if (blockBytesOpt.isDefined) { -Right(blockBytesOpt.get) +val buffer = blockBytesOpt.get +Some(new NioByteBufferManagedBuffer(buffer)) } else { -throw new BlockNotFoundException(blockId) +None } } } /** + * Put the block locally, using the given storage level. + */ + override def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit = { +putBytes(BlockId(blockId), data.nioByteBuffer(), level) + } + + /** * Get the BlockStatus for the block identified by the given ID, if it exists. * NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon. */ @@ -333,16 +339,10 @@ private[spark] class BlockManager( * shuffle blocks. It is safe to do so without a lock on block info since disk store * never deletes (recent) items. */ - def getLocalShuffleFromDisk( - blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { - -val shuffleBlockManager = shuffleManager.shuffleBlock
[1/5] [SPARK-3019] Pluggable block transfer interface (BlockTransferService)
Repository: spark Updated Branches: refs/heads/master 939a322c8 -> 08ce18881 http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index c200654..e251660 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -21,15 +21,19 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import java.util.Arrays import java.util.concurrent.TimeUnit +import org.apache.spark.network.nio.NioBlockTransferService + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.language.implicitConversions +import scala.language.postfixOps + import akka.actor._ import akka.pattern.ask import akka.util.Timeout -import org.apache.spark.shuffle.hash.HashShuffleManager -import org.mockito.invocation.InvocationOnMock -import org.mockito.Matchers.any -import org.mockito.Mockito.{doAnswer, mock, spy, when} -import org.mockito.stubbing.Answer +import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ @@ -38,18 +42,12 @@ import org.scalatest.Matchers import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} import org.apache.spark.executor.DataReadMethod -import org.apache.spark.network.{Message, ConnectionManagerId} import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.language.implicitConversions -import scala.language.postfixOps -import org.apache.spark.shuffle.ShuffleBlockManager class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester { @@ -74,8 +72,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId) private def makeBlockManager(maxMem: Long, name: String = ""): BlockManager = { -new BlockManager(name, actorSystem, master, serializer, maxMem, conf, securityMgr, - mapOutputTracker, shuffleManager) +val transfer = new NioBlockTransferService(conf, securityMgr) +new BlockManager(name, actorSystem, master, serializer, maxMem, conf, + mapOutputTracker, shuffleManager, transfer) } before { @@ -793,8 +792,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter test("block store put failure") { // Use Java serializer so we can create an unserializable error. +val transfer = new NioBlockTransferService(conf, securityMgr) store = new BlockManager("", actorSystem, master, new JavaSerializer(conf), 1200, conf, - securityMgr, mapOutputTracker, shuffleManager) + mapOutputTracker, shuffleManager, transfer) // The put should fail since a1 is not serializable. class UnserializableClass @@ -1005,109 +1005,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") } - test("return error message when error occurred in BlockManagerWorker#onBlockMessageReceive") { -store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker, shuffleManager) - -val worker = spy(new BlockManagerWorker(store)) -val connManagerId = mock(classOf[ConnectionManagerId]) - -// setup request block messages -val reqBlId1 = ShuffleBlockId(0,0,0) -val reqBlId2 = ShuffleBlockId(0,1,0) -val reqBlockMessage1 = BlockMessage.fromGetBlock(GetBlock(reqBlId1)) -val reqBlockMessage2 = BlockMessage.fromGetBlock(GetBlock(reqBlId2)) -val reqBlockMessages = new BlockMessageArray( - Seq(reqBlockMessage1, reqBlockMessage2)) -val reqBufferMessage = reqBlockMessages.toBufferMessage - -val answer = new Answer[Option[BlockMessage]] { - override def answer(invocation: InvocationOnMock) - :Option[BlockMessage]= { -throw new Exception - } -} - -doAnswer(answer).when(worker).processBlockMessage(any()) - -// Test when exception was thrown during processing block messages -var ackMessage = worker.onBlockMessageReceive(reqBufferMessage, connManagerId) - -assert(ackMessage.isDefined, "When Exception was thrown in "
git commit: [SPARK-3417] Use new-style classes in PySpark
Repository: spark Updated Branches: refs/heads/master 26bc7655d -> 939a322c8 [SPARK-3417] Use new-style classes in PySpark Tiny PR making SQLContext a new-style class. This allows various type logic to work more effectively ```Python In [1]: import pyspark In [2]: pyspark.sql.SQLContext.mro() Out[2]: [pyspark.sql.SQLContext, object] ``` Author: Matthew Rocklin Closes #2288 from mrocklin/sqlcontext-new-style-class and squashes the following commits: 4aadab6 [Matthew Rocklin] update other old-style classes a2dc02f [Matthew Rocklin] pyspark.sql.SQLContext is new-style class Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/939a322c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/939a322c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/939a322c Branch: refs/heads/master Commit: 939a322c85956eda150b10afb2ed1d8d959a7bdf Parents: 26bc765 Author: Matthew Rocklin Authored: Mon Sep 8 15:45:28 2014 -0700 Committer: Michael Armbrust Committed: Mon Sep 8 15:45:36 2014 -0700 -- python/pyspark/mllib/random.py | 2 +- python/pyspark/mllib/util.py | 2 +- python/pyspark/sql.py | 2 +- python/pyspark/storagelevel.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/939a322c/python/pyspark/mllib/random.py -- diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index 3e59c73..d53c95f 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -28,7 +28,7 @@ from pyspark.serializers import NoOpSerializer __all__ = ['RandomRDDs', ] -class RandomRDDs: +class RandomRDDs(object): """ Generator methods for creating RDDs comprised of i.i.d samples from some distribution. http://git-wip-us.apache.org/repos/asf/spark/blob/939a322c/python/pyspark/mllib/util.py -- diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 4962d05..1c7b8c8 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -25,7 +25,7 @@ from pyspark.rdd import RDD from pyspark.serializers import NoOpSerializer -class MLUtils: +class MLUtils(object): """ Helper methods to load, save and pre-process data used in MLlib. http://git-wip-us.apache.org/repos/asf/spark/blob/939a322c/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 004d493..53eea6d 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -899,7 +899,7 @@ def _create_cls(dataType): return Row -class SQLContext: +class SQLContext(object): """Main entry point for Spark SQL functionality. http://git-wip-us.apache.org/repos/asf/spark/blob/939a322c/python/pyspark/storagelevel.py -- diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py index 2aa0fb9..676aa0f 100644 --- a/python/pyspark/storagelevel.py +++ b/python/pyspark/storagelevel.py @@ -18,7 +18,7 @@ __all__ = ["StorageLevel"] -class StorageLevel: +class StorageLevel(object): """ Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SQL] Minor edits to sql programming guide.
Repository: spark Updated Branches: refs/heads/branch-1.1 8c6306a03 -> 7a236dcf8 [SQL] Minor edits to sql programming guide. Author: Henry Cook Closes #2316 from hcook/sql-docs and squashes the following commits: 373f94b [Henry Cook] Minor edits to sql programming guide. (cherry picked from commit 26bc7655de18ab0191ded3f75cb77bc756dc1c03) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a236dcf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a236dcf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a236dcf Branch: refs/heads/branch-1.1 Commit: 7a236dcf8e4721472cea6f1ae7b652618c118f43 Parents: 8c6306a Author: Henry Cook Authored: Mon Sep 8 14:56:37 2014 -0700 Committer: Michael Armbrust Committed: Mon Sep 8 14:56:53 2014 -0700 -- docs/sql-programming-guide.md | 92 +++--- 1 file changed, 47 insertions(+), 45 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7a236dcf/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 1814fef..d83efa4 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -13,10 +13,10 @@ title: Spark SQL Programming Guide Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using Spark. At the core of this component is a new type of RDD, -[SchemaRDD](api/scala/index.html#org.apache.spark.sql.SchemaRDD). SchemaRDDs are composed -[Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) objects along with +[SchemaRDD](api/scala/index.html#org.apache.spark.sql.SchemaRDD). SchemaRDDs are composed of +[Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) objects, along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table -in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) +in a traditional relational database. A SchemaRDD can be created from an existing RDD, a [Parquet](http://parquet.io) file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`. @@ -26,10 +26,10 @@ All of the examples on this page use sample data included in the Spark distribut Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using Spark. At the core of this component is a new type of RDD, -[JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD). JavaSchemaRDDs are composed -[Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects along with +[JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD). JavaSchemaRDDs are composed of +[Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects, along with a schema that describes the data types of each column in the row. A JavaSchemaRDD is similar to a table -in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) +in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, a [Parquet](http://parquet.io) file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). @@ -37,10 +37,10 @@ file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive]( Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using Spark. At the core of this component is a new type of RDD, -[SchemaRDD](api/python/pyspark.sql.SchemaRDD-class.html). SchemaRDDs are composed -[Row](api/python/pyspark.sql.Row-class.html) objects along with +[SchemaRDD](api/python/pyspark.sql.SchemaRDD-class.html). SchemaRDDs are composed of +[Row](api/python/pyspark.sql.Row-class.html) objects, along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table -in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) +in a traditional relational database. A SchemaRDD can be created from an existing RDD, a [Parquet](http://parquet.io) file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell. @@ -68,11 +68,11 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSc
git commit: [SQL] Minor edits to sql programming guide.
Repository: spark Updated Branches: refs/heads/master 386bc24eb -> 26bc7655d [SQL] Minor edits to sql programming guide. Author: Henry Cook Closes #2316 from hcook/sql-docs and squashes the following commits: 373f94b [Henry Cook] Minor edits to sql programming guide. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26bc7655 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26bc7655 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26bc7655 Branch: refs/heads/master Commit: 26bc7655de18ab0191ded3f75cb77bc756dc1c03 Parents: 386bc24 Author: Henry Cook Authored: Mon Sep 8 14:56:37 2014 -0700 Committer: Michael Armbrust Committed: Mon Sep 8 14:56:37 2014 -0700 -- docs/sql-programming-guide.md | 92 +++--- 1 file changed, 47 insertions(+), 45 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26bc7655/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 1814fef..d83efa4 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -13,10 +13,10 @@ title: Spark SQL Programming Guide Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using Spark. At the core of this component is a new type of RDD, -[SchemaRDD](api/scala/index.html#org.apache.spark.sql.SchemaRDD). SchemaRDDs are composed -[Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) objects along with +[SchemaRDD](api/scala/index.html#org.apache.spark.sql.SchemaRDD). SchemaRDDs are composed of +[Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) objects, along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table -in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) +in a traditional relational database. A SchemaRDD can be created from an existing RDD, a [Parquet](http://parquet.io) file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`. @@ -26,10 +26,10 @@ All of the examples on this page use sample data included in the Spark distribut Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using Spark. At the core of this component is a new type of RDD, -[JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD). JavaSchemaRDDs are composed -[Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects along with +[JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD). JavaSchemaRDDs are composed of +[Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects, along with a schema that describes the data types of each column in the row. A JavaSchemaRDD is similar to a table -in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) +in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, a [Parquet](http://parquet.io) file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). @@ -37,10 +37,10 @@ file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive]( Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using Spark. At the core of this component is a new type of RDD, -[SchemaRDD](api/python/pyspark.sql.SchemaRDD-class.html). SchemaRDDs are composed -[Row](api/python/pyspark.sql.Row-class.html) objects along with +[SchemaRDD](api/python/pyspark.sql.SchemaRDD-class.html). SchemaRDDs are composed of +[Row](api/python/pyspark.sql.Row-class.html) objects, along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table -in a traditional relational database. A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io) +in a traditional relational database. A SchemaRDD can be created from an existing RDD, a [Parquet](http://parquet.io) file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell. @@ -68,11 +68,11 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD {% endhighlight %} -In addition to the basic SQLContext, you can also create a HiveContext, which p
git commit: Provide a default PYSPARK_PYTHON for python/run_tests
Repository: spark Updated Branches: refs/heads/master 16a73c247 -> 386bc24eb Provide a default PYSPARK_PYTHON for python/run_tests Without this the version of python used in the test is not recorded. The error is, Testing with Python version: ./run-tests: line 57: --version: command not found Author: Matthew Farrellee Closes #2300 from mattf/master-fix-python-run-tests and squashes the following commits: 65a09f5 [Matthew Farrellee] Provide a default PYSPARK_PYTHON for python/run_tests Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/386bc24e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/386bc24e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/386bc24e Branch: refs/heads/master Commit: 386bc24ebe3e75875b9647d9223c62d7b9dc9963 Parents: 16a73c2 Author: Matthew Farrellee Authored: Mon Sep 8 12:37:52 2014 -0700 Committer: Josh Rosen Committed: Mon Sep 8 12:37:52 2014 -0700 -- python/run-tests | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/386bc24e/python/run-tests -- diff --git a/python/run-tests b/python/run-tests index 226e9e2..d98840d 100755 --- a/python/run-tests +++ b/python/run-tests @@ -50,6 +50,8 @@ function run_test() { echo "Running PySpark tests. Output is in python/unit-tests.log." +export PYSPARK_PYTHON="python" + # Try to test with Python 2.6, since that's the minimum version that we support: if [ $(which python2.6) ]; then export PYSPARK_PYTHON="python2.6" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: SPARK-2978. Transformation with MR shuffle semantics
Repository: spark Updated Branches: refs/heads/master e16a8e7db -> 16a73c247 SPARK-2978. Transformation with MR shuffle semantics I didn't add this to the transformations list in the docs because it's kind of obscure, but would be happy to do so if others think it would be helpful. Author: Sandy Ryza Closes #2274 from sryza/sandy-spark-2978 and squashes the following commits: 4a5332a [Sandy Ryza] Fix Java test c04b447 [Sandy Ryza] Fix Python doc and add back deleted code 433ad5b [Sandy Ryza] Add Java test 4c25a54 [Sandy Ryza] Add s at the end and a couple other fixes 9b0ba99 [Sandy Ryza] Fix compilation 36e0571 [Sandy Ryza] Fix import ordering 48c12c2 [Sandy Ryza] Add Java version and additional doc e5381cd [Sandy Ryza] Fix python style warnings f147634 [Sandy Ryza] SPARK-2978. Transformation with MR shuffle semantics Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16a73c24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16a73c24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16a73c24 Branch: refs/heads/master Commit: 16a73c2473181e03d88001aa3e08e6ffac92eb8b Parents: e16a8e7 Author: Sandy Ryza Authored: Mon Sep 8 11:20:00 2014 -0700 Committer: Matei Zaharia Committed: Mon Sep 8 11:20:00 2014 -0700 -- .../org/apache/spark/api/java/JavaPairRDD.scala | 26 + .../apache/spark/rdd/OrderedRDDFunctions.scala | 14 - .../java/org/apache/spark/JavaAPISuite.java | 30 .../scala/org/apache/spark/rdd/RDDSuite.scala | 14 + python/pyspark/rdd.py | 24 python/pyspark/tests.py | 8 ++ 6 files changed, 115 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index feeb6c0..880f61c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -759,6 +759,32 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) } /** + * Repartition the RDD according to the given partitioner and, within each resulting partition, + * sort records by their keys. + * + * This is more efficient than calling `repartition` and then sorting within each partition + * because it can push the sorting down into the shuffle machinery. + */ + def repartitionAndSortWithinPartitions(partitioner: Partitioner): JavaPairRDD[K, V] = { +val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] +repartitionAndSortWithinPartitions(partitioner, comp) + } + + /** + * Repartition the RDD according to the given partitioner and, within each resulting partition, + * sort records by their keys. + * + * This is more efficient than calling `repartition` and then sorting within each partition + * because it can push the sorting down into the shuffle machinery. + */ + def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K]) +: JavaPairRDD[K, V] = { +implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering. +fromRDD( + new OrderedRDDFunctions[K, V, (K, V)](rdd).repartitionAndSortWithinPartitions(partitioner)) + } + + /** * Sort the RDD by key, so that each partition contains a sorted range of the elements in * ascending order. Calling `collect` or `save` on the resulting RDD will return or output an * ordered list of records (in the `save` case, they will be written to multiple `part-X` files http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala index e98bad2..d0dbfef 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala @@ -19,7 +19,7 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.{Logging, RangePartitioner} +import org.apache.spark.{Logging, Partitioner, RangePartitioner} import org.apache.spark.annotation.DeveloperApi /** @@ -64,4 +64,16 @@ class OrderedRDDFunctions[K : Ordering : ClassTag, new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) } + + /
git commit: SPARK-3337 Paranoid quoting in shell to allow install dirs with spaces within.
Repository: spark Updated Branches: refs/heads/master 711356b42 -> e16a8e7db SPARK-3337 Paranoid quoting in shell to allow install dirs with spaces within. ... Tested ! TBH, it isn't a great idea to have directory with spaces within. Because emacs doesn't like it then hadoop doesn't like it. and so on... Author: Prashant Sharma Closes #2229 from ScrapCodes/SPARK-3337/quoting-shell-scripts and squashes the following commits: d4ad660 [Prashant Sharma] SPARK-3337 Paranoid quoting in shell to allow install dirs with spaces within. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e16a8e7d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e16a8e7d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e16a8e7d Branch: refs/heads/master Commit: e16a8e7db5a3b1065b14baf89cb723a59b99226b Parents: 711356b Author: Prashant Sharma Authored: Mon Sep 8 10:24:15 2014 -0700 Committer: Andrew Or Committed: Mon Sep 8 10:24:15 2014 -0700 -- bin/beeline | 2 +- bin/compute-classpath.sh | 12 ++-- bin/load-spark-env.sh| 4 ++-- bin/pyspark | 20 ++-- bin/run-example | 8 bin/spark-class | 20 ++-- bin/spark-shell | 10 +- bin/spark-sql| 8 bin/spark-submit | 4 ++-- dev/check-license| 16 dev/lint-python | 6 +++--- dev/mima | 4 ++-- dev/run-tests| 2 +- dev/scalastyle | 2 +- make-distribution.sh | 2 +- python/run-tests | 6 -- sbin/slaves.sh | 12 ++-- sbin/spark-config.sh | 16 sbin/spark-daemon.sh | 20 ++-- sbin/spark-executor | 8 sbin/start-all.sh| 4 ++-- sbin/start-history-server.sh | 4 ++-- sbin/start-master.sh | 4 ++-- sbin/start-slave.sh | 4 ++-- sbin/start-slaves.sh | 12 ++-- sbin/start-thriftserver.sh | 8 sbin/stop-all.sh | 4 ++-- sbin/stop-history-server.sh | 4 ++-- sbt/sbt | 20 ++-- sbt/sbt-launch-lib.bash | 12 ++-- 30 files changed, 130 insertions(+), 128 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e16a8e7d/bin/beeline -- diff --git a/bin/beeline b/bin/beeline index 1bda4db..3fcb6df 100755 --- a/bin/beeline +++ b/bin/beeline @@ -24,7 +24,7 @@ set -o posix # Figure out where Spark is installed -FWDIR="$(cd `dirname $0`/..; pwd)" +FWDIR="$(cd "`dirname "$0"`"/..; pwd)" CLASS="org.apache.hive.beeline.BeeLine" exec "$FWDIR/bin/spark-class" $CLASS "$@" http://git-wip-us.apache.org/repos/asf/spark/blob/e16a8e7d/bin/compute-classpath.sh -- diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 16b794a..15c6779 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -23,9 +23,9 @@ SCALA_VERSION=2.10 # Figure out where Spark is installed -FWDIR="$(cd `dirname $0`/..; pwd)" +FWDIR="$(cd "`dirname "$0"`"/..; pwd)" -. $FWDIR/bin/load-spark-env.sh +. "$FWDIR"/bin/load-spark-env.sh # Build up classpath CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf" @@ -63,7 +63,7 @@ else assembly_folder="$ASSEMBLY_DIR" fi -num_jars=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l) +num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)" if [ "$num_jars" -eq "0" ]; then echo "Failed to find Spark assembly in $assembly_folder" echo "You need to build Spark before running this program." @@ -77,7 +77,7 @@ if [ "$num_jars" -gt "1" ]; then exit 1 fi -ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null) +ASSEMBLY_JAR="$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)" # Verify that versions of java used to build the jars and run Spark are compatible jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1) @@ -103,8 +103,8 @@ else datanucleus_dir="$FWDIR"/lib_managed/jars fi -datanucleus_jars=$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar") -datanucleus_jars=$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g) +datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar")" +datanucleus_jars="$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)" if [ -n "$datanucleus_jars" ]; then hive_files=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null) http://git-wip
[1/2] [SPARK-3086] [SPARK-3043] [SPARK-3156] [mllib] DecisionTree aggregation improvements
Repository: spark Updated Branches: refs/heads/master 0d1cc4ae4 -> 711356b42 http://git-wip-us.apache.org/repos/asf/spark/blob/711356b4/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/DTStatsAggregator.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/DTStatsAggregator.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/DTStatsAggregator.scala new file mode 100644 index 000..866d85a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/DTStatsAggregator.scala @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.tree.impl + +import org.apache.spark.mllib.tree.impurity._ + +/** + * DecisionTree statistics aggregator. + * This holds a flat array of statistics for a set of (nodes, features, bins) + * and helps with indexing. + */ +private[tree] class DTStatsAggregator( +val metadata: DecisionTreeMetadata, +val numNodes: Int) extends Serializable { + + /** + * [[ImpurityAggregator]] instance specifying the impurity type. + */ + val impurityAggregator: ImpurityAggregator = metadata.impurity match { +case Gini => new GiniAggregator(metadata.numClasses) +case Entropy => new EntropyAggregator(metadata.numClasses) +case Variance => new VarianceAggregator() +case _ => throw new IllegalArgumentException(s"Bad impurity parameter: ${metadata.impurity}") + } + + /** + * Number of elements (Double values) used for the sufficient statistics of each bin. + */ + val statsSize: Int = impurityAggregator.statsSize + + val numFeatures: Int = metadata.numFeatures + + /** + * Number of bins for each feature. This is indexed by the feature index. + */ + val numBins: Array[Int] = metadata.numBins + + /** + * Number of splits for the given feature. + */ + def numSplits(featureIndex: Int): Int = metadata.numSplits(featureIndex) + + /** + * Indicator for each feature of whether that feature is an unordered feature. + * TODO: Is Array[Boolean] any faster? + */ + def isUnordered(featureIndex: Int): Boolean = metadata.isUnordered(featureIndex) + + /** + * Offset for each feature for calculating indices into the [[allStats]] array. + */ + private val featureOffsets: Array[Int] = { +def featureOffsetsCalc(total: Int, featureIndex: Int): Int = { + if (isUnordered(featureIndex)) { +total + 2 * numBins(featureIndex) + } else { +total + numBins(featureIndex) + } +} +Range(0, numFeatures).scanLeft(0)(featureOffsetsCalc).map(statsSize * _).toArray + } + + /** + * Number of elements for each node, corresponding to stride between nodes in [[allStats]]. + */ + private val nodeStride: Int = featureOffsets.last + + /** + * Total number of elements stored in this aggregator. + */ + val allStatsSize: Int = numNodes * nodeStride + + /** + * Flat array of elements. + * Index for start of stats for a (node, feature, bin) is: + * index = nodeIndex * nodeStride + featureOffsets(featureIndex) + binIndex * statsSize + * Note: For unordered features, the left child stats have binIndex in [0, numBins(featureIndex)) + * and the right child stats in [numBins(featureIndex), 2 * numBins(featureIndex)) + */ + val allStats: Array[Double] = new Array[Double](allStatsSize) + + /** + * Get an [[ImpurityCalculator]] for a given (node, feature, bin). + * @param nodeFeatureOffset For ordered features, this is a pre-computed (node, feature) offset + * from [[getNodeFeatureOffset]]. + * For unordered features, this is a pre-computed + * (node, feature, left/right child) offset from + * [[getLeftRightNodeFeatureOffsets]]. + */ + def getImpurityCalculator(nodeFeatureOffset: Int, binIndex: Int): ImpurityCalculator = { +impurityAggregator.getCalculator(allStats, nodeFeatureOffset + binIndex * statsSize) + } + + /** + * Update the stats for a given (node, feature, bin) for ordered features, using the given label. + */ + def update(nodeIndex: Int, featureIndex
[2/2] git commit: [SPARK-3086] [SPARK-3043] [SPARK-3156] [mllib] DecisionTree aggregation improvements
[SPARK-3086] [SPARK-3043] [SPARK-3156] [mllib] DecisionTree aggregation improvements Summary: 1. Variable numBins for each feature [SPARK-3043] 2. Reduced data reshaping in aggregation [SPARK-3043] 3. Choose ordering for ordered categorical features adaptively [SPARK-3156] 4. Changed nodes to use 1-indexing [SPARK-3086] 5. Small clean-ups Note: This PR looks bigger than it is since I moved several functions from inside findBestSplitsPerGroup to outside of it (to make it clear what was being serialized in the aggregation). Speedups: This update helps most when many features use few bins but a few features use many bins. Some example results on speedups with 2M examples, 3.5K features (15-worker EC2 cluster): * Example where old code was reasonably efficient (1/2 continuous, 1/4 binary, 1/4 20-category): 164.813 --> 116.491 sec * Example where old code wasted many bins (1/10 continuous, 81/100 binary, 9/100 20-category): 128.701 --> 39.334 sec Details: (1) Variable numBins for each feature [SPARK-3043] DecisionTreeMetadata now computes a variable numBins for each feature. It also tracks numSplits. (2) Reduced data reshaping in aggregation [SPARK-3043] Added DTStatsAggregator, a wrapper around the aggregate statistics array for easy but efficient indexing. * Added ImpurityAggregator and ImpurityCalculator classes, to make DecisionTree code more oblivious to the type of impurity. * Design note: I originally tried creating Impurity classes which stored data and storing the aggregates in an Array[Array[Array[Impurity]]]. However, this led to significant slowdowns, perhaps because of overhead in creating so many objects. The aggregate statistics are never reshaped, and cumulative sums are computed in-place. Updated the layout of aggregation functions. The update simplifies things by (1) dividing features into ordered/unordered (instead of ordered/unordered/continuous) and (2) making use of the DTStatsAggregator for indexing. For this update, the following functions were refactored: * updateBinForOrderedFeature * updateBinForUnorderedFeature * binaryOrNotCategoricalBinSeqOp * multiclassWithCategoricalBinSeqOp * regressionBinSeqOp The above 5 functions were replaced with: * orderedBinSeqOp * someUnorderedBinSeqOp Other changes: * calculateGainForSplit now treats all feature types the same way. * Eliminated extractLeftRightNodeAggregates. (3) Choose ordering for ordered categorical features adaptively [SPARK-3156] Updated binsToBestSplit(): * This now computes cumulative sums of stats for ordered features. * For ordered categorical features, it chooses an ordering for categories. (This uses to be done by findSplitsBins.) * Uses iterators to shorten code and avoid building an Array[Array[InformationGainStats]]. Side effects: * In findSplitsBins: A sample of the data is only taken for data with continuous features. It is not needed for data with only categorical features. * In findSplitsBins: splits and bins are no longer pre-computed for ordered categorical features since they are not needed. * TreePoint binning is simpler for categorical features. (4) Changed nodes to use 1-indexing [SPARK-3086] Nodes used to be indexed from 0. Now they are indexed from 1. Node indexing functions are now collected in object Node (Node.scala). (5) Small clean-ups Eliminated functions extractNodeInfo() and extractInfoForLowerLevels() to reduce duplicate code. Eliminated InvalidBinIndex since it is no longer used. CC: mengxr manishamde Please let me know if you have thoughts on thisâthanks! Author: Joseph K. Bradley Closes #2125 from jkbradley/dt-opt3alt and squashes the following commits: 42c192a [Joseph K. Bradley] Merge branch 'rfs' into dt-opt3alt d3cc46b [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into dt-opt3alt 00e4404 [Joseph K. Bradley] optimization for TreePoint construction (pre-computing featureArity and isUnordered as arrays) 425716c [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into rfs a2acea5 [Joseph K. Bradley] Small optimizations based on profiling aa4e4df [Joseph K. Bradley] Updated DTStatsAggregator with bug fix (nodeString should not be multiplied by statsSize) 4651154 [Joseph K. Bradley] Changed numBins semantics for unordered features. * Before: numBins = numSplits = (1 << k - 1) - 1 * Now: numBins = 2 * numSplits = 2 * [(1 << k - 1) - 1] * This also involved changing the semantics of: ** DecisionTreeMetadata.numUnorderedBins() 1e3b1c7 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into dt-opt3alt 1485fcc [Joseph K. Bradley] Made some DecisionTree methods private. 92f934f [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into dt-opt3alt e676da1 [Joseph K. Bradley] Updated documentation for DecisionTree 37ca845 [Joseph K. Bradley] Fixed problem with how DecisionTree handles ordered categorical features. 105f8ab [Joseph K. Bradley] Removed commented-out getEmptyBinA
git commit: [HOTFIX] A left over version change. It should make mima happy.
Repository: spark Updated Branches: refs/heads/master eddfeddac -> 0d1cc4ae4 [HOTFIX] A left over version change. It should make mima happy. Author: Prashant Sharma Closes #2317 from ScrapCodes/hotfix and squashes the following commits: b6472d4 [Prashant Sharma] [HOTFIX] for hotfixes, a left over version change. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d1cc4ae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d1cc4ae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d1cc4ae Branch: refs/heads/master Commit: 0d1cc4ae42e1f73538dd8b9b1880ca9e5b124108 Parents: eddfedd Author: Prashant Sharma Authored: Mon Sep 8 14:32:53 2014 +0530 Committer: Prashant Sharma Committed: Mon Sep 8 14:32:53 2014 +0530 -- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0d1cc4ae/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a26c2c9..45f6d29 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -184,7 +184,7 @@ object OldDeps { def versionArtifact(id: String): Option[sbt.ModuleID] = { val fullId = id + "_2.10" -Some("org.apache.spark" % fullId % "1.0.0") +Some("org.apache.spark" % fullId % "1.1.0") } def oldDepsSettings() = Defaults.defaultSettings ++ Seq( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org