git commit: SPARK-2425 Don't kill a still-running Application because of some misbehaving Executors

2014-09-08 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 2b7ab814f -> 092e2f152


SPARK-2425 Don't kill a still-running Application because of some misbehaving 
Executors

Introduces a LOADING -> RUNNING ApplicationState transition and prevents Master 
from removing an Application with RUNNING Executors.

Two basic changes: 1) Instead of allowing MAX_NUM_RETRY abnormal Executor exits 
over the entire lifetime of the Application, allow that many since any Executor 
successfully began running the Application; 2) Don't remove the Application 
while Master still thinks that there are RUNNING Executors.

This should be fine as long as the ApplicationInfo doesn't believe any 
Executors are forever RUNNING when they are not.  I think that any non-RUNNING 
Executors will eventually no longer be RUNNING in Master's accounting, but 
another set of eyes should confirm that.  This PR also doesn't try to detect 
which nodes have gone rogue or to kill off bad Workers, so repeatedly failing 
Executors will continue to fail and fill up log files with failure reports as 
long as the Application keeps running.

Author: Mark Hamstra 

Closes #1360 from markhamstra/SPARK-2425 and squashes the following commits:

f099c0b [Mark Hamstra] Reuse appInfo
b2b7b25 [Mark Hamstra] Moved 'Application failed' logging
bdd0928 [Mark Hamstra] switched to string interpolation
1dd591b [Mark Hamstra] SPARK-2425 introduce LOADING -> RUNNING ApplicationState 
transition and prevent Master from removing Application with RUNNING Executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/092e2f15
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/092e2f15
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/092e2f15

Branch: refs/heads/master
Commit: 092e2f152fb674e7200cc8a2cb99a8fe0a9b2b33
Parents: 2b7ab81
Author: Mark Hamstra 
Authored: Mon Sep 8 20:51:56 2014 -0700
Committer: Andrew Or 
Committed: Mon Sep 8 20:51:56 2014 -0700

--
 .../spark/deploy/master/ApplicationInfo.scala   |  4 ++-
 .../org/apache/spark/deploy/master/Master.scala | 26 
 .../spark/deploy/worker/ExecutorRunner.scala|  2 ++
 .../org/apache/spark/deploy/worker/Worker.scala |  2 +-
 4 files changed, 22 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/092e2f15/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index d367442..c3ca43f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -96,11 +96,13 @@ private[spark] class ApplicationInfo(
 
   def retryCount = _retryCount
 
-  def incrementRetryCount = {
+  def incrementRetryCount() = {
 _retryCount += 1
 _retryCount
   }
 
+  def resetRetryCount() = _retryCount = 0
+
   def markFinished(endState: ApplicationState.Value) {
 state = endState
 endTime = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/spark/blob/092e2f15/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2a66fcf..a3909d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -296,28 +296,34 @@ private[spark] class Master(
   val execOption = idToApp.get(appId).flatMap(app => 
app.executors.get(execId))
   execOption match {
 case Some(exec) => {
+  val appInfo = idToApp(appId)
   exec.state = state
+  if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
   exec.application.driver ! ExecutorUpdated(execId, state, message, 
exitStatus)
   if (ExecutorState.isFinished(state)) {
-val appInfo = idToApp(appId)
 // Remove this executor from the worker and app
-logInfo("Removing executor " + exec.fullId + " because it is " + 
state)
+logInfo(s"Removing executor ${exec.fullId} because it is $state")
 appInfo.removeExecutor(exec)
 exec.worker.removeExecutor(exec)
 
-val normalExit = exitStatus.exists(_ == 0)
+val normalExit = exitStatus == Some(0)
 // Only retry certain number of times so we don't go into an 
infinite loop.
-if (!normalExit && appInfo.incrementRetryCount < 
ApplicationState.MAX_NUM

git commit: [SPARK-3329][SQL] Don't depend on Hive SET pair ordering in tests.

2014-09-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master dc1dbf206 -> 2b7ab814f


[SPARK-3329][SQL] Don't depend on Hive SET pair ordering in tests.

This fixes some possible spurious test failures in `HiveQuerySuite` by 
comparing sets of key-value pairs as sets, rather than as lists.

Author: William Benton 
Author: Aaron Davidson 

Closes #2220 from willb/spark-3329 and squashes the following commits:

3b3e205 [William Benton] Collapse collectResults case match in HiveQuerySuite
6525d8e [William Benton] Handle cases where SET returns Rows of (single) strings
cf11b0e [Aaron Davidson] Fix flakey HiveQuerySuite test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b7ab814
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b7ab814
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b7ab814

Branch: refs/heads/master
Commit: 2b7ab814f9bde65ebc57ebd04386e56c97f06f4a
Parents: dc1dbf2
Author: William Benton 
Authored: Mon Sep 8 19:29:18 2014 -0700
Committer: Michael Armbrust 
Committed: Mon Sep 8 19:29:23 2014 -0700

--
 .../sql/hive/execution/HiveQuerySuite.scala | 47 +++-
 1 file changed, 26 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2b7ab814/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 305998c..6bf8d18 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -558,62 +558,67 @@ class HiveQuerySuite extends HiveComparisonTest {
 val testKey = "spark.sql.key.usedfortestonly"
 val testVal = "test.val.0"
 val nonexistentKey = "nonexistent"
-
+val KV = "([^=]+)=([^=]*)".r
+def collectResults(rdd: SchemaRDD): Set[(String, String)] = 
+  rdd.collect().map { 
+case Row(key: String, value: String) => key -> value 
+case Row(KV(key, value)) => key -> value
+  }.toSet
 clear()
 
 // "set" itself returns all config variables currently specified in 
SQLConf.
 // TODO: Should we be listing the default here always? probably...
 assert(sql("SET").collect().size == 0)
 
-assertResult(Array(s"$testKey=$testVal")) {
-  sql(s"SET $testKey=$testVal").collect().map(_.getString(0))
+assertResult(Set(testKey -> testVal)) {
+  collectResults(hql(s"SET $testKey=$testVal"))
 }
 
 assert(hiveconf.get(testKey, "") == testVal)
-assertResult(Array(s"$testKey=$testVal")) {
-  sql(s"SET $testKey=$testVal").collect().map(_.getString(0))
+assertResult(Set(testKey -> testVal)) {
+  collectResults(hql("SET"))
 }
 
 sql(s"SET ${testKey + testKey}=${testVal + testVal}")
 assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
-assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal 
+ testVal}")) {
-  sql(s"SET").collect().map(_.getString(0))
+assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + 
testVal))) {
+  collectResults(hql("SET"))
 }
 
 // "set key"
-assertResult(Array(s"$testKey=$testVal")) {
-  sql(s"SET $testKey").collect().map(_.getString(0))
+assertResult(Set(testKey -> testVal)) {
+  collectResults(hql(s"SET $testKey"))
 }
 
-assertResult(Array(s"$nonexistentKey=")) {
-  sql(s"SET $nonexistentKey").collect().map(_.getString(0))
+assertResult(Set(nonexistentKey -> "")) {
+  collectResults(hql(s"SET $nonexistentKey"))
 }
 
 // Assert that sql() should have the same effects as sql() by repeating 
the above using sql().
 clear()
 assert(sql("SET").collect().size == 0)
 
-assertResult(Array(s"$testKey=$testVal")) {
-  sql(s"SET $testKey=$testVal").collect().map(_.getString(0))
+assertResult(Set(testKey -> testVal)) {
+  collectResults(sql(s"SET $testKey=$testVal"))
 }
 
 assert(hiveconf.get(testKey, "") == testVal)
-assertResult(Array(s"$testKey=$testVal")) {
-  sql("SET").collect().map(_.getString(0))
+assertResult(Set(testKey -> testVal)) {
+  collectResults(sql("SET"))
 }
 
 sql(s"SET ${testKey + testKey}=${testVal + testVal}")
 assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
-assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal 
+ testVal}")) {
-  sql("SET").collect().map(_.getString(0))
+assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + 
testVal))) {
+  collectResults(sql("SET"))
 }
 
-

git commit: [SPARK-3414][SQL] Stores analyzed logical plan when registering a temp table

2014-09-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master ca0348e68 -> dc1dbf206


[SPARK-3414][SQL] Stores analyzed logical plan when registering a temp table

Case insensitivity breaks when unresolved relation contains attributes with 
uppercase letters in their names, because we store unanalyzed logical plan when 
registering temp tables while the `CaseInsensitivityAttributeReferences` batch 
runs before the `Resolution` batch. To fix this issue, we need to store 
analyzed logical plan.

Author: Cheng Lian 

Closes #2293 from liancheng/spark-3414 and squashes the following commits:

d9fa1d6 [Cheng Lian] Stores analyzed logical plan when registering a temp table


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc1dbf20
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc1dbf20
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc1dbf20

Branch: refs/heads/master
Commit: dc1dbf206e0076a43ad2120d8bb5b1fc6912fe25
Parents: ca0348e
Author: Cheng Lian 
Authored: Mon Sep 8 19:08:05 2014 -0700
Committer: Michael Armbrust 
Committed: Mon Sep 8 19:08:05 2014 -0700

--
 .../scala/org/apache/spark/sql/SQLContext.scala |  4 ++--
 .../sql/hive/execution/HiveQuerySuite.scala | 25 +---
 2 files changed, 24 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dc1dbf20/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 5acb45c..a2f334a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -246,7 +246,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
   def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
-catalog.registerTable(None, tableName, rdd.logicalPlan)
+catalog.registerTable(None, tableName, rdd.queryExecution.analyzed)
   }
 
   /**
@@ -411,7 +411,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
 protected def stringOrError[A](f: => A): String =
   try f.toString catch { case e: Throwable => e.toString }
 
-def simpleString: String = 
+def simpleString: String =
   s"""== Physical Plan ==
  |${stringOrError(executedPlan)}
   """

http://git-wip-us.apache.org/repos/asf/spark/blob/dc1dbf20/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index f4217a5..305998c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -17,11 +17,8 @@
 
 package org.apache.spark.sql.hive.execution
 
-import java.io.File
-
 import scala.util.Try
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
@@ -514,6 +511,28 @@ class HiveQuerySuite extends HiveComparisonTest {
 sql("DROP TABLE alter1")
   }
 
+  case class LogEntry(filename: String, message: String)
+  case class LogFile(name: String)
+
+  test("SPARK-3414 regression: should store analyzed logical plan when 
registering a temp table") {
+sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs")
+sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles")
+
+sql(
+  """
+  SELECT name, message
+  FROM rawLogs
+  JOIN (
+SELECT name
+FROM logFiles
+  ) files
+  ON rawLogs.filename = files.name
+  """).registerTempTable("boom")
+
+// This should be successfully analyzed
+sql("SELECT * FROM boom").queryExecution.analyzed
+  }
+
   test("parse HQL set commands") {
 // Adapted from its SQL counterpart.
 val testKey = "spark.sql.key.usedfortestonly"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: SPARK-3423: [SQL] Implement BETWEEN for SQLParser

2014-09-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 50a4fa774 -> ca0348e68


SPARK-3423:  [SQL] Implement BETWEEN for SQLParser

This patch improves the SQLParser by adding support for BETWEEN conditions

Author: William Benton 

Closes #2295 from willb/sql-between and squashes the following commits:

0016d30 [William Benton] Implement BETWEEN for SQLParser


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca0348e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca0348e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca0348e6

Branch: refs/heads/master
Commit: ca0348e68213c2c7589f2018ebf9d889c0ce59c3
Parents: 50a4fa7
Author: William Benton 
Authored: Mon Sep 8 19:05:02 2014 -0700
Committer: Michael Armbrust 
Committed: Mon Sep 8 19:05:02 2014 -0700

--
 .../org/apache/spark/sql/catalyst/SqlParser.scala |  4 
 .../org/apache/spark/sql/SQLQuerySuite.scala  | 18 ++
 2 files changed, 22 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ca0348e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index a88bd85..bfc197c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -73,6 +73,7 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
   protected val ASC = Keyword("ASC")
   protected val APPROXIMATE = Keyword("APPROXIMATE")
   protected val AVG = Keyword("AVG")
+  protected val BETWEEN = Keyword("BETWEEN")
   protected val BY = Keyword("BY")
   protected val CACHE = Keyword("CACHE")
   protected val CAST = Keyword("CAST")
@@ -272,6 +273,9 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
 termExpression ~ ">=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => 
GreaterThanOrEqual(e1, e2) } |
 termExpression ~ "!=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => 
Not(EqualTo(e1, e2)) } |
 termExpression ~ "<>" ~ termExpression ^^ { case e1 ~ _ ~ e2 => 
Not(EqualTo(e1, e2)) } |
+termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ { 
+  case e ~ _ ~ el ~ _  ~ eu => And(GreaterThanOrEqual(e, el), 
LessThanOrEqual(e, eu))
+} |
 termExpression ~ RLIKE ~ termExpression ^^ { case e1 ~ _ ~ e2 => RLike(e1, 
e2) } |
 termExpression ~ REGEXP ~ termExpression ^^ { case e1 ~ _ ~ e2 => 
RLike(e1, e2) } |
 termExpression ~ LIKE ~ termExpression ^^ { case e1 ~ _ ~ e2 => Like(e1, 
e2) } |

http://git-wip-us.apache.org/repos/asf/spark/blob/ca0348e6/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e8fbc28..45c0ca8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -597,4 +597,22 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll {
   (3, null) ::
   (4, 2147483644) :: Nil)
   }
+  
+  test("SPARK-3423 BETWEEN") {
+checkAnswer(
+  sql("SELECT key, value FROM testData WHERE key BETWEEN 5 and 7"),
+  Seq((5, "5"), (6, "6"), (7, "7"))
+)
+
+checkAnswer(
+  sql("SELECT key, value FROM testData WHERE key BETWEEN 7 and 7"),
+  Seq((7, "7"))
+)
+
+checkAnswer(
+  sql("SELECT key, value FROM testData WHERE key BETWEEN 9 and 7"),
+  Seq()
+)
+
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-3443][MLLIB] update default values of tree:

2014-09-08 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 7db53391f -> 50a4fa774


[SPARK-3443][MLLIB] update default values of tree:

Adjust the default values of decision tree, based on the memory requirement 
discussed in https://github.com/apache/spark/pull/2125 :

1. maxMemoryInMB: 128 -> 256
2. maxBins: 100 -> 32
3. maxDepth: 4 -> 5 (in some example code)

jkbradley

Author: Xiangrui Meng 

Closes #2322 from mengxr/tree-defaults and squashes the following commits:

cda453a [Xiangrui Meng] fix tests
5900445 [Xiangrui Meng] update comments
8c81831 [Xiangrui Meng] update default values of tree:


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50a4fa77
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50a4fa77
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50a4fa77

Branch: refs/heads/master
Commit: 50a4fa774a0e8a17d7743b33ce8941bf4041144d
Parents: 7db5339
Author: Xiangrui Meng 
Authored: Mon Sep 8 18:59:57 2014 -0700
Committer: Xiangrui Meng 
Committed: Mon Sep 8 18:59:57 2014 -0700

--
 docs/mllib-decision-tree.md   | 16 
 .../spark/examples/mllib/JavaDecisionTree.java|  2 +-
 .../spark/examples/mllib/DecisionTreeRunner.scala |  4 ++--
 .../apache/spark/mllib/tree/DecisionTree.scala|  8 
 .../spark/mllib/tree/configuration/Strategy.scala |  6 +++---
 .../spark/mllib/tree/DecisionTreeSuite.scala  | 18 --
 python/pyspark/mllib/tree.py  |  4 ++--
 7 files changed, 24 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/50a4fa77/docs/mllib-decision-tree.md
--
diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md
index 1166d9c..12a6afb 100644
--- a/docs/mllib-decision-tree.md
+++ b/docs/mllib-decision-tree.md
@@ -80,7 +80,7 @@ The ordered splits create "bins" and the maximum number of 
such
 bins can be specified using the `maxBins` parameter.
 
 Note that the number of bins cannot be greater than the number of instances 
`$N$` (a rare scenario
-since the default `maxBins` value is 100). The tree algorithm automatically 
reduces the number of
+since the default `maxBins` value is 32). The tree algorithm automatically 
reduces the number of
 bins if the condition is not satisfied.
 
 **Categorical features**
@@ -117,7 +117,7 @@ all nodes at each level of the tree. This could lead to 
high memory requirements
 of the tree, potentially leading to memory overflow errors. To alleviate this 
problem, a `maxMemoryInMB`
 training parameter specifies the maximum amount of memory at the workers 
(twice as much at the
 master) to be allocated to the histogram computation. The default value is 
conservatively chosen to
-be 128 MB to allow the decision algorithm to work in most scenarios. Once the 
memory requirements
+be 256 MB to allow the decision algorithm to work in most scenarios. Once the 
memory requirements
 for a level-wise computation cross the `maxMemoryInMB` threshold, the node 
training tasks at each
 subsequent level are split into smaller tasks.
 
@@ -167,7 +167,7 @@ val numClasses = 2
 val categoricalFeaturesInfo = Map[Int, Int]()
 val impurity = "gini"
 val maxDepth = 5
-val maxBins = 100
+val maxBins = 32
 
 val model = DecisionTree.trainClassifier(data, numClasses, 
categoricalFeaturesInfo, impurity,
   maxDepth, maxBins)
@@ -213,7 +213,7 @@ Integer numClasses = 2;
 HashMap categoricalFeaturesInfo = new HashMap();
 String impurity = "gini";
 Integer maxDepth = 5;
-Integer maxBins = 100;
+Integer maxBins = 32;
 
 // Train a DecisionTree model for classification.
 final DecisionTreeModel model = DecisionTree.trainClassifier(data, numClasses,
@@ -250,7 +250,7 @@ data = MLUtils.loadLibSVMFile(sc, 
'data/mllib/sample_libsvm_data.txt').cache()
 # Train a DecisionTree model.
 #  Empty categoricalFeaturesInfo indicates all features are continuous.
 model = DecisionTree.trainClassifier(data, numClasses=2, 
categoricalFeaturesInfo={},
- impurity='gini', maxDepth=5, maxBins=100)
+ impurity='gini', maxDepth=5, maxBins=32)
 
 # Evaluate model on training instances and compute training error
 predictions = model.predict(data.map(lambda x: x.features))
@@ -293,7 +293,7 @@ val data = MLUtils.loadLibSVMFile(sc, 
"data/mllib/sample_libsvm_data.txt").cache
 val categoricalFeaturesInfo = Map[Int, Int]()
 val impurity = "variance"
 val maxDepth = 5
-val maxBins = 100
+val maxBins = 32
 
 val model = DecisionTree.trainRegressor(data, categoricalFeaturesInfo, 
impurity,
   maxDepth, maxBins)
@@ -338,7 +338,7 @@ JavaSparkContext sc = new JavaSparkContext(sparkConf);
 HashMap categoricalFeaturesInfo = new HashMap();
 String impurit

git commit: [SPARK-3349][SQL] Output partitioning of limit should not be inherited from child

2014-09-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 08ce18881 -> 7db53391f


[SPARK-3349][SQL] Output partitioning of limit should not be inherited from 
child

This resolves https://issues.apache.org/jira/browse/SPARK-3349

Author: Eric Liang 

Closes #2262 from ericl/spark-3349 and squashes the following commits:

3e1b05c [Eric Liang] add regression test
ac32723 [Eric Liang] make limit/takeOrdered output SinglePartition


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7db53391
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7db53391
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7db53391

Branch: refs/heads/master
Commit: 7db53391f1b349d1f49844197b34f94806f5e336
Parents: 08ce188
Author: Eric Liang 
Authored: Mon Sep 8 16:14:32 2014 -0700
Committer: Michael Armbrust 
Committed: Mon Sep 8 16:14:36 2014 -0700

--
 .../spark/sql/execution/basicOperators.scala   |  4 +++-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 17 +
 2 files changed, 20 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7db53391/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 47bff0c..cac3766 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -27,7 +27,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
OrderedDistribution, SinglePartition, UnspecifiedDistribution}
 import org.apache.spark.util.MutablePair
 
 /**
@@ -100,6 +100,7 @@ case class Limit(limit: Int, child: SparkPlan)
   private def sortBasedShuffleOn = 
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
 
   override def output = child.output
+  override def outputPartitioning = SinglePartition
 
   /**
* A custom implementation modeled after the take function on RDDs but which 
never runs any job
@@ -173,6 +174,7 @@ case class Limit(limit: Int, child: SparkPlan)
 case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: 
SparkPlan) extends UnaryNode {
 
   override def output = child.output
+  override def outputPartitioning = SinglePartition
 
   val ordering = new RowOrdering(sortOrder, child.output)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7db53391/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 1ac2059..e8fbc28 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -359,6 +359,23 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll {
   (null, null, 6, "F") :: Nil)
   }
 
+  test("SPARK-3349 partitioning after limit") {
+sql("SELECT DISTINCT n FROM lowerCaseData ORDER BY n DESC")
+  .limit(2)
+  .registerTempTable("subset1")
+sql("SELECT DISTINCT n FROM lowerCaseData")
+  .limit(2)
+  .registerTempTable("subset2")
+checkAnswer(
+  sql("SELECT * FROM lowerCaseData INNER JOIN subset1 ON subset1.n = 
lowerCaseData.n"),
+  (3, "c", 3) ::
+  (4, "d", 4) :: Nil)
+checkAnswer(
+  sql("SELECT * FROM lowerCaseData INNER JOIN subset2 ON subset2.n = 
lowerCaseData.n"),
+  (1, "a", 1) ::
+  (2, "b", 2) :: Nil)
+  }
+
   test("mixed-case keywords") {
 checkAnswer(
   sql(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[5/5] git commit: [SPARK-3019] Pluggable block transfer interface (BlockTransferService)

2014-09-08 Thread rxin
[SPARK-3019] Pluggable block transfer interface (BlockTransferService)

This pull request creates a new BlockTransferService interface for block 
fetch/upload and refactors the existing ConnectionManager to implement 
BlockTransferService (NioBlockTransferService).

Most of the changes are simply moving code around. The main class to inspect is 
ShuffleBlockFetcherIterator.

Review guide:
- Most of the ConnectionManager code is now in network.cm package
- ManagedBuffer is a new buffer abstraction backed by several different 
implementations (file segment, nio ByteBuffer, Netty ByteBuf)
- BlockTransferService is the main internal interface introduced in this PR
- NioBlockTransferService implements BlockTransferService and replaces the old 
BlockManagerWorker
- ShuffleBlockFetcherIterator replaces the told BlockFetcherIterator to use the 
new interface

TODOs that should be separate PRs:
- Implement NettyBlockTransferService
- Finalize the API/semantics for ManagedBuffer.release()

Author: Reynold Xin 

Closes #2240 from rxin/blockTransferService and squashes the following commits:

64cd9d7 [Reynold Xin] Merge branch 'master' into blockTransferService
1dfd3d7 [Reynold Xin] Limit the length of the FileInputStream.
1332156 [Reynold Xin] Fixed style violation from refactoring.
2960c93 [Reynold Xin] Added ShuffleBlockFetcherIteratorSuite.
e29c721 [Reynold Xin] Updated comment for ShuffleBlockFetcherIterator.
8a1046e [Reynold Xin] Code review feedback:
2c6b1e1 [Reynold Xin] Removed println in test cases.
2a907e4 [Reynold Xin] Merge branch 'master' into blockTransferService-merge
07ccf0d [Reynold Xin] Added init check to CMBlockTransferService.
98c668a [Reynold Xin] Added failure handling and fixed unit tests.
ae05fcd [Reynold Xin] Updated tests, although DistributedSuite is hanging.
d8d595c [Reynold Xin] Merge branch 'master' of github.com:apache/spark into 
blockTransferService
9ef279c [Reynold Xin] Initial refactoring to move ConnectionManager to use the 
BlockTransferService.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08ce1888
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08ce1888
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08ce1888

Branch: refs/heads/master
Commit: 08ce18881e09c6e91db9c410d1d9ce1e5ae63a62
Parents: 939a322
Author: Reynold Xin 
Authored: Mon Sep 8 15:59:20 2014 -0700
Committer: Reynold Xin 
Committed: Mon Sep 8 15:59:20 2014 -0700

--
 .../main/scala/org/apache/spark/SparkEnv.scala  |   15 +-
 .../apache/spark/network/BlockDataManager.scala |   36 +
 .../spark/network/BlockFetchingListener.scala   |   37 +
 .../spark/network/BlockTransferService.scala|  131 +++
 .../apache/spark/network/BufferMessage.scala|  113 --
 .../org/apache/spark/network/Connection.scala   |  587 --
 .../org/apache/spark/network/ConnectionId.scala |   34 -
 .../spark/network/ConnectionManager.scala   | 1047 --
 .../spark/network/ConnectionManagerId.scala |   37 -
 .../spark/network/ConnectionManagerTest.scala   |  103 --
 .../apache/spark/network/ManagedBuffer.scala|  107 ++
 .../org/apache/spark/network/Message.scala  |   95 --
 .../org/apache/spark/network/MessageChunk.scala |   41 -
 .../spark/network/MessageChunkHeader.scala  |   82 --
 .../org/apache/spark/network/ReceiverTest.scala |   37 -
 .../apache/spark/network/SecurityMessage.scala  |  162 ---
 .../org/apache/spark/network/SenderTest.scala   |   76 --
 .../apache/spark/network/nio/BlockMessage.scala |  197 
 .../spark/network/nio/BlockMessageArray.scala   |  160 +++
 .../spark/network/nio/BufferMessage.scala   |  114 ++
 .../apache/spark/network/nio/Connection.scala   |  587 ++
 .../apache/spark/network/nio/ConnectionId.scala |   34 +
 .../spark/network/nio/ConnectionManager.scala   | 1042 +
 .../spark/network/nio/ConnectionManagerId.scala |   37 +
 .../org/apache/spark/network/nio/Message.scala  |   96 ++
 .../apache/spark/network/nio/MessageChunk.scala |   41 +
 .../spark/network/nio/MessageChunkHeader.scala  |   81 ++
 .../network/nio/NioBlockTransferService.scala   |  205 
 .../spark/network/nio/SecurityMessage.scala |  160 +++
 .../spark/serializer/KryoSerializer.scala   |2 +-
 .../spark/shuffle/FileShuffleBlockManager.scala |   35 +-
 .../shuffle/IndexShuffleBlockManager.scala  |   24 +-
 .../spark/shuffle/ShuffleBlockManager.scala |6 +-
 .../shuffle/hash/BlockStoreShuffleFetcher.scala |   14 +-
 .../spark/shuffle/hash/HashShuffleReader.scala  |4 +-
 .../spark/storage/BlockFetcherIterator.scala|  254 -
 .../org/apache/spark/storage/BlockManager.scala |   98 +-
 .../apache/spark/storage/BlockManagerId.scala   |4 +-
 .../spark/storage/BlockManagerWorker.scala  |  147 ---
 .../org/apache/spark/storage/BlockMessage.scala |  209 
 .../spark/storage/BlockMessa

[4/5] [SPARK-3019] Pluggable block transfer interface (BlockTransferService)

2014-09-08 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala 
b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
deleted file mode 100644
index 4894ecd..000
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network
-
-import java.nio.ByteBuffer
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.io.Source
-
-import org.apache.spark._
-
-private[spark] object ConnectionManagerTest extends Logging{
-  def main(args: Array[String]) {
-//  - the master URL  - a list slaves to run 
connectionTest on
-// [num of tasks] - the number of parallel tasks to be initiated default 
is number of slave
-// hosts [size of msg in MB (integer)] - the size of messages to be sent 
in each task,
-// default is 10 [count] - how many times to run, default is 3 [await time 
in seconds] :
-// await time (in seconds), default is 600
-if (args.length < 2) {
-  println("Usage: ConnectionManagerTest   [num 
of tasks] " +
-"[size of msg in MB (integer)] [count] [await time in seconds)] ")
-  System.exit(1)
-}
-
-if (args(0).startsWith("local")) {
-  println("This runs only on a mesos cluster")
-}
-
-val sc = new SparkContext(args(0), "ConnectionManagerTest")
-val slavesFile = Source.fromFile(args(1))
-val slaves = slavesFile.mkString.split("\n")
-slavesFile.close()
-
-/* println("Slaves") */
-/* slaves.foreach(println) */
-val tasknum = if (args.length > 2) args(2).toInt else slaves.length
-val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
-val count = if (args.length > 4) args(4).toInt else 3
-val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second
-println("Running " + count + " rounds of test: " + "parallel tasks = " + 
tasknum + ", " +
-  "msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
-val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map(
-i => SparkEnv.get.connectionManager.id).collect()
-println("\nSlave ConnectionManagerIds")
-slaveConnManagerIds.foreach(println)
-println
-
-(0 until count).foreach(i => {
-  val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => {
-val connManager = SparkEnv.get.connectionManager
-val thisConnManagerId = connManager.id
-connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) 
=> {
-  logInfo("Received [" + msg + "] from [" + id + "]")
-  None
-})
-
-val buffer = 
ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
-buffer.flip
-
-val startTime = System.currentTimeMillis
-val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map{ 
slaveConnManagerId =>
-  {
-val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-logInfo("Sending [" + bufferMessage + "] to [" + 
slaveConnManagerId + "]")
-connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
-  }
-}
-val results = futures.map(f => Await.result(f, awaitTime))
-val finishTime = System.currentTimeMillis
-Thread.sleep(5000)
-
-val mb = size * results.size / 1024.0 / 1024.0
-val ms = finishTime - startTime
-val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " 
ms at " + (mb / ms *
-  1000.0) + " MB/s"
-logInfo(resultStr)
-resultStr
-  }).collect()
-
-  println("-")
-  println("Run " + i)
-  resultStrs.foreach(println)
-  println("-")
-})
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
-

[3/5] [SPARK-3019] Pluggable block transfer interface (BlockTransferService)

2014-09-08 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala 
b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
new file mode 100644
index 000..09d3ea3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -0,0 +1,1042 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.nio
+
+import java.io.IOException
+import java.net._
+import java.nio._
+import java.nio.channels._
+import java.nio.channels.spi._
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{LinkedBlockingDeque, ThreadPoolExecutor, TimeUnit}
+import java.util.{Timer, TimerTask}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, 
SynchronizedMap, SynchronizedQueue}
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
+import scala.language.postfixOps
+
+import org.apache.spark._
+import org.apache.spark.util.{SystemClock, Utils}
+
+
+private[nio] class ConnectionManager(
+port: Int,
+conf: SparkConf,
+securityManager: SecurityManager,
+name: String = "Connection manager")
+  extends Logging {
+
+  /**
+   * Used by sendMessageReliably to track messages being sent.
+   * @param message the message that was sent
+   * @param connectionManagerId the connection manager that sent this message
+   * @param completionHandler callback that's invoked when the send has 
completed or failed
+   */
+  class MessageStatus(
+  val message: Message,
+  val connectionManagerId: ConnectionManagerId,
+  completionHandler: MessageStatus => Unit) {
+
+/** This is non-None if message has been ack'd */
+var ackMessage: Option[Message] = None
+
+def markDone(ackMessage: Option[Message]) {
+  this.ackMessage = ackMessage
+  completionHandler(this)
+}
+  }
+
+  private val selector = SelectorProvider.provider.openSelector()
+  private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true)
+
+  // default to 30 second timeout waiting for authentication
+  private val authTimeout = 
conf.getInt("spark.core.connection.auth.wait.timeout", 30)
+  private val ackTimeout = 
conf.getInt("spark.core.connection.ack.wait.timeout", 60)
+
+  private val handleMessageExecutor = new ThreadPoolExecutor(
+conf.getInt("spark.core.connection.handler.threads.min", 20),
+conf.getInt("spark.core.connection.handler.threads.max", 60),
+conf.getInt("spark.core.connection.handler.threads.keepalive", 60), 
TimeUnit.SECONDS,
+new LinkedBlockingDeque[Runnable](),
+Utils.namedThreadFactory("handle-message-executor"))
+
+  private val handleReadWriteExecutor = new ThreadPoolExecutor(
+conf.getInt("spark.core.connection.io.threads.min", 4),
+conf.getInt("spark.core.connection.io.threads.max", 32),
+conf.getInt("spark.core.connection.io.threads.keepalive", 60), 
TimeUnit.SECONDS,
+new LinkedBlockingDeque[Runnable](),
+Utils.namedThreadFactory("handle-read-write-executor"))
+
+  // Use a different, yet smaller, thread pool - infrequently used with very 
short lived tasks :
+  // which should be executed asap
+  private val handleConnectExecutor = new ThreadPoolExecutor(
+conf.getInt("spark.core.connection.connect.threads.min", 1),
+conf.getInt("spark.core.connection.connect.threads.max", 8),
+conf.getInt("spark.core.connection.connect.threads.keepalive", 60), 
TimeUnit.SECONDS,
+new LinkedBlockingDeque[Runnable](),
+Utils.namedThreadFactory("handle-connect-executor"))
+
+  private val serverChannel = ServerSocketChannel.open()
+  // used to track the SendingConnections waiting to do SASL negotiation
+  private val connectionsAwaitingSasl = new HashMap[ConnectionId, 
SendingConnection]
+with SynchronizedMap[ConnectionId, SendingConnection]
+  private val connectionsByKey =
+new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, 
Connection]
+  private val connectionsById = new HashMap[Conne

[2/5] [SPARK-3019] Pluggable block transfer interface (BlockTransferService)

2014-09-08 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a714142..d1bee3d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -20,6 +20,8 @@ package org.apache.spark.storage
 import java.io.{File, InputStream, OutputStream, BufferedOutputStream, 
ByteArrayOutputStream}
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
+import scala.concurrent.ExecutionContext.Implicits.global
+
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
@@ -58,18 +60,14 @@ private[spark] class BlockManager(
 defaultSerializer: Serializer,
 maxMemory: Long,
 val conf: SparkConf,
-securityManager: SecurityManager,
 mapOutputTracker: MapOutputTracker,
-shuffleManager: ShuffleManager)
-  extends BlockDataProvider with Logging {
+shuffleManager: ShuffleManager,
+blockTransferService: BlockTransferService)
+  extends BlockDataManager with Logging {
 
-  private val port = conf.getInt("spark.blockManager.port", 0)
+  blockTransferService.init(this)
 
   val diskBlockManager = new DiskBlockManager(this, conf)
-  val connectionManager =
-new ConnectionManager(port, conf, securityManager, "Connection manager for 
block manager")
-
-  implicit val futureExecContext = connectionManager.futureExecContext
 
   private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
 
@@ -89,11 +87,7 @@ private[spark] class BlockManager(
   }
 
   val blockManagerId = BlockManagerId(
-executorId, connectionManager.id.host, connectionManager.id.port)
-
-  // Max megabytes of data to keep in flight per reducer (to avoid 
over-allocating memory
-  // for receiving shuffle outputs)
-  val maxBytesInFlight = conf.getLong("spark.reducer.maxMbInFlight", 48) * 
1024 * 1024
+executorId, blockTransferService.hostName, blockTransferService.port)
 
   // Whether to compress broadcast variables that are stored
   private val compressBroadcast = conf.getBoolean("spark.broadcast.compress", 
true)
@@ -136,11 +130,11 @@ private[spark] class BlockManager(
   master: BlockManagerMaster,
   serializer: Serializer,
   conf: SparkConf,
-  securityManager: SecurityManager,
   mapOutputTracker: MapOutputTracker,
-  shuffleManager: ShuffleManager) = {
+  shuffleManager: ShuffleManager,
+  blockTransferService: BlockTransferService) = {
 this(execId, actorSystem, master, serializer, 
BlockManager.getMaxMemory(conf),
-  conf, securityManager, mapOutputTracker, shuffleManager)
+  conf, mapOutputTracker, shuffleManager, blockTransferService)
   }
 
   /**
@@ -149,7 +143,6 @@ private[spark] class BlockManager(
*/
   private def initialize(): Unit = {
 master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
-BlockManagerWorker.startBlockManagerWorker(this)
   }
 
   /**
@@ -212,21 +205,34 @@ private[spark] class BlockManager(
 }
   }
 
-  override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] 
= {
+  /**
+   * Interface to get local block data.
+   *
+   * @return Some(buffer) if the block exists locally, and None if it doesn't.
+   */
+  override def getBlockData(blockId: String): Option[ManagedBuffer] = {
 val bid = BlockId(blockId)
 if (bid.isShuffle) {
-  
shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId])
+  
Some(shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId]))
 } else {
   val blockBytesOpt = doGetLocal(bid, asBlockResult = 
false).asInstanceOf[Option[ByteBuffer]]
   if (blockBytesOpt.isDefined) {
-Right(blockBytesOpt.get)
+val buffer = blockBytesOpt.get
+Some(new NioByteBufferManagedBuffer(buffer))
   } else {
-throw new BlockNotFoundException(blockId)
+None
   }
 }
   }
 
   /**
+   * Put the block locally, using the given storage level.
+   */
+  override def putBlockData(blockId: String, data: ManagedBuffer, level: 
StorageLevel): Unit = {
+putBytes(BlockId(blockId), data.nioByteBuffer(), level)
+  }
+
+  /**
* Get the BlockStatus for the block identified by the given ID, if it 
exists.
* NOTE: This is mainly for testing, and it doesn't fetch information from 
Tachyon.
*/
@@ -333,16 +339,10 @@ private[spark] class BlockManager(
* shuffle blocks. It is safe to do so without a lock on block info since 
disk store
* never deletes (recent) items.
*/
-  def getLocalShuffleFromDisk(
-  blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
-
-val shuffleBlockManager = shuffleManager.shuffleBlock

[1/5] [SPARK-3019] Pluggable block transfer interface (BlockTransferService)

2014-09-08 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 939a322c8 -> 08ce18881


http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index c200654..e251660 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -21,15 +21,19 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
 import java.util.Arrays
 import java.util.concurrent.TimeUnit
 
+import org.apache.spark.network.nio.NioBlockTransferService
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+
 import akka.actor._
 import akka.pattern.ask
 import akka.util.Timeout
-import org.apache.spark.shuffle.hash.HashShuffleManager
 
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.Matchers.any
-import org.mockito.Mockito.{doAnswer, mock, spy, when}
-import org.mockito.stubbing.Answer
+import org.mockito.Mockito.{mock, when}
 
 import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
@@ -38,18 +42,12 @@ import org.scalatest.Matchers
 
 import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
 import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.network.{Message, ConnectionManagerId}
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
 import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, 
Utils}
 
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.language.implicitConversions
-import scala.language.postfixOps
-import org.apache.spark.shuffle.ShuffleBlockManager
 
 class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   with PrivateMethodTester {
@@ -74,8 +72,9 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
   def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
 
   private def makeBlockManager(maxMem: Long, name: String = ""): 
BlockManager = {
-new BlockManager(name, actorSystem, master, serializer, maxMem, conf, 
securityMgr,
-  mapOutputTracker, shuffleManager)
+val transfer = new NioBlockTransferService(conf, securityMgr)
+new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
+  mapOutputTracker, shuffleManager, transfer)
   }
 
   before {
@@ -793,8 +792,9 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
 
   test("block store put failure") {
 // Use Java serializer so we can create an unserializable error.
+val transfer = new NioBlockTransferService(conf, securityMgr)
 store = new BlockManager("", actorSystem, master, new 
JavaSerializer(conf), 1200, conf,
-  securityMgr, mapOutputTracker, shuffleManager)
+  mapOutputTracker, shuffleManager, transfer)
 
 // The put should fail since a1 is not serializable.
 class UnserializableClass
@@ -1005,109 +1005,6 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
 assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
   }
 
-  test("return error message when error occurred in 
BlockManagerWorker#onBlockMessageReceive") {
-store = new BlockManager("", actorSystem, master, serializer, 
1200, conf,
-  securityMgr, mapOutputTracker, shuffleManager)
-
-val worker = spy(new BlockManagerWorker(store))
-val connManagerId = mock(classOf[ConnectionManagerId])
-
-// setup request block messages
-val reqBlId1 = ShuffleBlockId(0,0,0)
-val reqBlId2 = ShuffleBlockId(0,1,0)
-val reqBlockMessage1 = BlockMessage.fromGetBlock(GetBlock(reqBlId1))
-val reqBlockMessage2 = BlockMessage.fromGetBlock(GetBlock(reqBlId2))
-val reqBlockMessages = new BlockMessageArray(
-  Seq(reqBlockMessage1, reqBlockMessage2))
-val reqBufferMessage = reqBlockMessages.toBufferMessage
-
-val answer = new Answer[Option[BlockMessage]] {
-  override def answer(invocation: InvocationOnMock)
-  :Option[BlockMessage]= {
-throw new Exception
-  }
-}
-
-doAnswer(answer).when(worker).processBlockMessage(any())
-
-// Test when exception was thrown during processing block messages
-var ackMessage = worker.onBlockMessageReceive(reqBufferMessage, 
connManagerId)
-
-assert(ackMessage.isDefined, "When Exception was thrown in "

git commit: [SPARK-3417] Use new-style classes in PySpark

2014-09-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 26bc7655d -> 939a322c8


[SPARK-3417] Use new-style classes in PySpark

Tiny PR making SQLContext a new-style class.  This allows various type logic to 
work more effectively

```Python
In [1]: import pyspark

In [2]: pyspark.sql.SQLContext.mro()
Out[2]: [pyspark.sql.SQLContext, object]
```

Author: Matthew Rocklin 

Closes #2288 from mrocklin/sqlcontext-new-style-class and squashes the 
following commits:

4aadab6 [Matthew Rocklin] update other old-style classes
a2dc02f [Matthew Rocklin] pyspark.sql.SQLContext is new-style class


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/939a322c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/939a322c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/939a322c

Branch: refs/heads/master
Commit: 939a322c85956eda150b10afb2ed1d8d959a7bdf
Parents: 26bc765
Author: Matthew Rocklin 
Authored: Mon Sep 8 15:45:28 2014 -0700
Committer: Michael Armbrust 
Committed: Mon Sep 8 15:45:36 2014 -0700

--
 python/pyspark/mllib/random.py | 2 +-
 python/pyspark/mllib/util.py   | 2 +-
 python/pyspark/sql.py  | 2 +-
 python/pyspark/storagelevel.py | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/939a322c/python/pyspark/mllib/random.py
--
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index 3e59c73..d53c95f 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -28,7 +28,7 @@ from pyspark.serializers import NoOpSerializer
 __all__ = ['RandomRDDs', ]
 
 
-class RandomRDDs:
+class RandomRDDs(object):
 """
 Generator methods for creating RDDs comprised of i.i.d samples from
 some distribution.

http://git-wip-us.apache.org/repos/asf/spark/blob/939a322c/python/pyspark/mllib/util.py
--
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 4962d05..1c7b8c8 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -25,7 +25,7 @@ from pyspark.rdd import RDD
 from pyspark.serializers import NoOpSerializer
 
 
-class MLUtils:
+class MLUtils(object):
 
 """
 Helper methods to load, save and pre-process data used in MLlib.

http://git-wip-us.apache.org/repos/asf/spark/blob/939a322c/python/pyspark/sql.py
--
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 004d493..53eea6d 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -899,7 +899,7 @@ def _create_cls(dataType):
 return Row
 
 
-class SQLContext:
+class SQLContext(object):
 
 """Main entry point for Spark SQL functionality.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/939a322c/python/pyspark/storagelevel.py
--
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index 2aa0fb9..676aa0f 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -18,7 +18,7 @@
 __all__ = ["StorageLevel"]
 
 
-class StorageLevel:
+class StorageLevel(object):
 
 """
 Flags for controlling the storage of an RDD. Each StorageLevel records 
whether to use memory,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SQL] Minor edits to sql programming guide.

2014-09-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 8c6306a03 -> 7a236dcf8


[SQL] Minor edits to sql programming guide.

Author: Henry Cook 

Closes #2316 from hcook/sql-docs and squashes the following commits:

373f94b [Henry Cook] Minor edits to sql programming guide.

(cherry picked from commit 26bc7655de18ab0191ded3f75cb77bc756dc1c03)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a236dcf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a236dcf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a236dcf

Branch: refs/heads/branch-1.1
Commit: 7a236dcf8e4721472cea6f1ae7b652618c118f43
Parents: 8c6306a
Author: Henry Cook 
Authored: Mon Sep 8 14:56:37 2014 -0700
Committer: Michael Armbrust 
Committed: Mon Sep 8 14:56:53 2014 -0700

--
 docs/sql-programming-guide.md | 92 +++---
 1 file changed, 47 insertions(+), 45 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7a236dcf/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 1814fef..d83efa4 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -13,10 +13,10 @@ title: Spark SQL Programming Guide
 
 Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be 
executed using
 Spark.  At the core of this component is a new type of RDD,
-[SchemaRDD](api/scala/index.html#org.apache.spark.sql.SchemaRDD).  SchemaRDDs 
are composed
-[Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) 
objects along with
+[SchemaRDD](api/scala/index.html#org.apache.spark.sql.SchemaRDD).  SchemaRDDs 
are composed of
+[Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) 
objects, along with
 a schema that describes the data types of each column in the row.  A SchemaRDD 
is similar to a table
-in a traditional relational database.  A SchemaRDD can be created from an 
existing RDD, [Parquet](http://parquet.io)
+in a traditional relational database.  A SchemaRDD can be created from an 
existing RDD, a [Parquet](http://parquet.io)
 file, a JSON dataset, or by running HiveQL against data stored in [Apache 
Hive](http://hive.apache.org/).
 
 All of the examples on this page use sample data included in the Spark 
distribution and can be run in the `spark-shell`.
@@ -26,10 +26,10 @@ All of the examples on this page use sample data included 
in the Spark distribut
 
 Spark SQL allows relational queries expressed in SQL or HiveQL to be executed 
using
 Spark.  At the core of this component is a new type of RDD,
-[JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD).
  JavaSchemaRDDs are composed
-[Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects along 
with
+[JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD).
  JavaSchemaRDDs are composed of
+[Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects, along 
with
 a schema that describes the data types of each column in the row.  A 
JavaSchemaRDD is similar to a table
-in a traditional relational database.  A JavaSchemaRDD can be created from an 
existing RDD, [Parquet](http://parquet.io)
+in a traditional relational database.  A JavaSchemaRDD can be created from an 
existing RDD, a [Parquet](http://parquet.io)
 file, a JSON dataset, or by running HiveQL against data stored in [Apache 
Hive](http://hive.apache.org/).
 
 
@@ -37,10 +37,10 @@ file, a JSON dataset, or by running HiveQL against data 
stored in [Apache Hive](
 
 Spark SQL allows relational queries expressed in SQL or HiveQL to be executed 
using
 Spark.  At the core of this component is a new type of RDD,
-[SchemaRDD](api/python/pyspark.sql.SchemaRDD-class.html).  SchemaRDDs are 
composed
-[Row](api/python/pyspark.sql.Row-class.html) objects along with
+[SchemaRDD](api/python/pyspark.sql.SchemaRDD-class.html).  SchemaRDDs are 
composed of
+[Row](api/python/pyspark.sql.Row-class.html) objects, along with
 a schema that describes the data types of each column in the row.  A SchemaRDD 
is similar to a table
-in a traditional relational database.  A SchemaRDD can be created from an 
existing RDD, [Parquet](http://parquet.io)
+in a traditional relational database.  A SchemaRDD can be created from an 
existing RDD, a [Parquet](http://parquet.io)
 file, a JSON dataset, or by running HiveQL against data stored in [Apache 
Hive](http://hive.apache.org/).
 
 All of the examples on this page use sample data included in the Spark 
distribution and can be run in the `pyspark` shell.
@@ -68,11 +68,11 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext.createSc

git commit: [SQL] Minor edits to sql programming guide.

2014-09-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 386bc24eb -> 26bc7655d


[SQL] Minor edits to sql programming guide.

Author: Henry Cook 

Closes #2316 from hcook/sql-docs and squashes the following commits:

373f94b [Henry Cook] Minor edits to sql programming guide.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26bc7655
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26bc7655
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26bc7655

Branch: refs/heads/master
Commit: 26bc7655de18ab0191ded3f75cb77bc756dc1c03
Parents: 386bc24
Author: Henry Cook 
Authored: Mon Sep 8 14:56:37 2014 -0700
Committer: Michael Armbrust 
Committed: Mon Sep 8 14:56:37 2014 -0700

--
 docs/sql-programming-guide.md | 92 +++---
 1 file changed, 47 insertions(+), 45 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/26bc7655/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 1814fef..d83efa4 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -13,10 +13,10 @@ title: Spark SQL Programming Guide
 
 Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be 
executed using
 Spark.  At the core of this component is a new type of RDD,
-[SchemaRDD](api/scala/index.html#org.apache.spark.sql.SchemaRDD).  SchemaRDDs 
are composed
-[Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) 
objects along with
+[SchemaRDD](api/scala/index.html#org.apache.spark.sql.SchemaRDD).  SchemaRDDs 
are composed of
+[Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) 
objects, along with
 a schema that describes the data types of each column in the row.  A SchemaRDD 
is similar to a table
-in a traditional relational database.  A SchemaRDD can be created from an 
existing RDD, [Parquet](http://parquet.io)
+in a traditional relational database.  A SchemaRDD can be created from an 
existing RDD, a [Parquet](http://parquet.io)
 file, a JSON dataset, or by running HiveQL against data stored in [Apache 
Hive](http://hive.apache.org/).
 
 All of the examples on this page use sample data included in the Spark 
distribution and can be run in the `spark-shell`.
@@ -26,10 +26,10 @@ All of the examples on this page use sample data included 
in the Spark distribut
 
 Spark SQL allows relational queries expressed in SQL or HiveQL to be executed 
using
 Spark.  At the core of this component is a new type of RDD,
-[JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD).
  JavaSchemaRDDs are composed
-[Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects along 
with
+[JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD).
  JavaSchemaRDDs are composed of
+[Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects, along 
with
 a schema that describes the data types of each column in the row.  A 
JavaSchemaRDD is similar to a table
-in a traditional relational database.  A JavaSchemaRDD can be created from an 
existing RDD, [Parquet](http://parquet.io)
+in a traditional relational database.  A JavaSchemaRDD can be created from an 
existing RDD, a [Parquet](http://parquet.io)
 file, a JSON dataset, or by running HiveQL against data stored in [Apache 
Hive](http://hive.apache.org/).
 
 
@@ -37,10 +37,10 @@ file, a JSON dataset, or by running HiveQL against data 
stored in [Apache Hive](
 
 Spark SQL allows relational queries expressed in SQL or HiveQL to be executed 
using
 Spark.  At the core of this component is a new type of RDD,
-[SchemaRDD](api/python/pyspark.sql.SchemaRDD-class.html).  SchemaRDDs are 
composed
-[Row](api/python/pyspark.sql.Row-class.html) objects along with
+[SchemaRDD](api/python/pyspark.sql.SchemaRDD-class.html).  SchemaRDDs are 
composed of
+[Row](api/python/pyspark.sql.Row-class.html) objects, along with
 a schema that describes the data types of each column in the row.  A SchemaRDD 
is similar to a table
-in a traditional relational database.  A SchemaRDD can be created from an 
existing RDD, [Parquet](http://parquet.io)
+in a traditional relational database.  A SchemaRDD can be created from an 
existing RDD, a [Parquet](http://parquet.io)
 file, a JSON dataset, or by running HiveQL against data stored in [Apache 
Hive](http://hive.apache.org/).
 
 All of the examples on this page use sample data included in the Spark 
distribution and can be run in the `pyspark` shell.
@@ -68,11 +68,11 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext.createSchemaRDD
 {% endhighlight %}
 
-In addition to the basic SQLContext, you can also create a HiveContext, which 
p

git commit: Provide a default PYSPARK_PYTHON for python/run_tests

2014-09-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 16a73c247 -> 386bc24eb


Provide a default PYSPARK_PYTHON for python/run_tests

Without this the version of python used in the test is not
recorded. The error is,

   Testing with Python version:
   ./run-tests: line 57: --version: command not found

Author: Matthew Farrellee 

Closes #2300 from mattf/master-fix-python-run-tests and squashes the following 
commits:

65a09f5 [Matthew Farrellee] Provide a default PYSPARK_PYTHON for 
python/run_tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/386bc24e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/386bc24e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/386bc24e

Branch: refs/heads/master
Commit: 386bc24ebe3e75875b9647d9223c62d7b9dc9963
Parents: 16a73c2
Author: Matthew Farrellee 
Authored: Mon Sep 8 12:37:52 2014 -0700
Committer: Josh Rosen 
Committed: Mon Sep 8 12:37:52 2014 -0700

--
 python/run-tests | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/386bc24e/python/run-tests
--
diff --git a/python/run-tests b/python/run-tests
index 226e9e2..d98840d 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -50,6 +50,8 @@ function run_test() {
 
 echo "Running PySpark tests. Output is in python/unit-tests.log."
 
+export PYSPARK_PYTHON="python"
+
 # Try to test with Python 2.6, since that's the minimum version that we 
support:
 if [ $(which python2.6) ]; then
 export PYSPARK_PYTHON="python2.6"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: SPARK-2978. Transformation with MR shuffle semantics

2014-09-08 Thread matei
Repository: spark
Updated Branches:
  refs/heads/master e16a8e7db -> 16a73c247


SPARK-2978. Transformation with MR shuffle semantics

I didn't add this to the transformations list in the docs because it's kind of 
obscure, but would be happy to do so if others think it would be helpful.

Author: Sandy Ryza 

Closes #2274 from sryza/sandy-spark-2978 and squashes the following commits:

4a5332a [Sandy Ryza] Fix Java test
c04b447 [Sandy Ryza] Fix Python doc and add back deleted code
433ad5b [Sandy Ryza] Add Java test
4c25a54 [Sandy Ryza] Add s at the end and a couple other fixes
9b0ba99 [Sandy Ryza] Fix compilation
36e0571 [Sandy Ryza] Fix import ordering
48c12c2 [Sandy Ryza] Add Java version and additional doc
e5381cd [Sandy Ryza] Fix python style warnings
f147634 [Sandy Ryza] SPARK-2978. Transformation with MR shuffle semantics


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16a73c24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16a73c24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16a73c24

Branch: refs/heads/master
Commit: 16a73c2473181e03d88001aa3e08e6ffac92eb8b
Parents: e16a8e7
Author: Sandy Ryza 
Authored: Mon Sep 8 11:20:00 2014 -0700
Committer: Matei Zaharia 
Committed: Mon Sep 8 11:20:00 2014 -0700

--
 .../org/apache/spark/api/java/JavaPairRDD.scala | 26 +
 .../apache/spark/rdd/OrderedRDDFunctions.scala  | 14 -
 .../java/org/apache/spark/JavaAPISuite.java | 30 
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 14 +
 python/pyspark/rdd.py   | 24 
 python/pyspark/tests.py |  8 ++
 6 files changed, 115 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index feeb6c0..880f61c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -759,6 +759,32 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
   }
 
   /**
+   * Repartition the RDD according to the given partitioner and, within each 
resulting partition,
+   * sort records by their keys.
+   *
+   * This is more efficient than calling `repartition` and then sorting within 
each partition
+   * because it can push the sorting down into the shuffle machinery.
+   */
+  def repartitionAndSortWithinPartitions(partitioner: Partitioner): 
JavaPairRDD[K, V] = {
+val comp = 
com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
+repartitionAndSortWithinPartitions(partitioner, comp)
+  }
+
+  /**
+   * Repartition the RDD according to the given partitioner and, within each 
resulting partition,
+   * sort records by their keys.
+   *
+   * This is more efficient than calling `repartition` and then sorting within 
each partition
+   * because it can push the sorting down into the shuffle machinery.
+   */
+  def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: 
Comparator[K])
+: JavaPairRDD[K, V] = {
+implicit val ordering = comp // Allow implicit conversion of Comparator to 
Ordering.
+fromRDD(
+  new OrderedRDDFunctions[K, V, (K, 
V)](rdd).repartitionAndSortWithinPartitions(partitioner))
+  }
+
+  /**
* Sort the RDD by key, so that each partition contains a sorted range of 
the elements in
* ascending order. Calling `collect` or `save` on the resulting RDD will 
return or output an
* ordered list of records (in the `save` case, they will be written to 
multiple `part-X` files

http://git-wip-us.apache.org/repos/asf/spark/blob/16a73c24/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index e98bad2..d0dbfef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.rdd
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.{Logging, RangePartitioner}
+import org.apache.spark.{Logging, Partitioner, RangePartitioner}
 import org.apache.spark.annotation.DeveloperApi
 
 /**
@@ -64,4 +64,16 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
 new ShuffledRDD[K, V, V](self, part)
   .setKeyOrdering(if (ascending) ordering else ordering.reverse)
   }
+
+  /

git commit: SPARK-3337 Paranoid quoting in shell to allow install dirs with spaces within.

2014-09-08 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 711356b42 -> e16a8e7db


SPARK-3337 Paranoid quoting in shell to allow install dirs with spaces within.

...

Tested ! TBH, it isn't a great idea to have directory with spaces within. 
Because emacs doesn't like it then hadoop doesn't like it. and so on...

Author: Prashant Sharma 

Closes #2229 from ScrapCodes/SPARK-3337/quoting-shell-scripts and squashes the 
following commits:

d4ad660 [Prashant Sharma] SPARK-3337 Paranoid quoting in shell to allow install 
dirs with spaces within.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e16a8e7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e16a8e7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e16a8e7d

Branch: refs/heads/master
Commit: e16a8e7db5a3b1065b14baf89cb723a59b99226b
Parents: 711356b
Author: Prashant Sharma 
Authored: Mon Sep 8 10:24:15 2014 -0700
Committer: Andrew Or 
Committed: Mon Sep 8 10:24:15 2014 -0700

--
 bin/beeline  |  2 +-
 bin/compute-classpath.sh | 12 ++--
 bin/load-spark-env.sh|  4 ++--
 bin/pyspark  | 20 ++--
 bin/run-example  |  8 
 bin/spark-class  | 20 ++--
 bin/spark-shell  | 10 +-
 bin/spark-sql|  8 
 bin/spark-submit |  4 ++--
 dev/check-license| 16 
 dev/lint-python  |  6 +++---
 dev/mima |  4 ++--
 dev/run-tests|  2 +-
 dev/scalastyle   |  2 +-
 make-distribution.sh |  2 +-
 python/run-tests |  6 --
 sbin/slaves.sh   | 12 ++--
 sbin/spark-config.sh | 16 
 sbin/spark-daemon.sh | 20 ++--
 sbin/spark-executor  |  8 
 sbin/start-all.sh|  4 ++--
 sbin/start-history-server.sh |  4 ++--
 sbin/start-master.sh |  4 ++--
 sbin/start-slave.sh  |  4 ++--
 sbin/start-slaves.sh | 12 ++--
 sbin/start-thriftserver.sh   |  8 
 sbin/stop-all.sh |  4 ++--
 sbin/stop-history-server.sh  |  4 ++--
 sbt/sbt  | 20 ++--
 sbt/sbt-launch-lib.bash  | 12 ++--
 30 files changed, 130 insertions(+), 128 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e16a8e7d/bin/beeline
--
diff --git a/bin/beeline b/bin/beeline
index 1bda4db..3fcb6df 100755
--- a/bin/beeline
+++ b/bin/beeline
@@ -24,7 +24,7 @@
 set -o posix
 
 # Figure out where Spark is installed
-FWDIR="$(cd `dirname $0`/..; pwd)"
+FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
 
 CLASS="org.apache.hive.beeline.BeeLine"
 exec "$FWDIR/bin/spark-class" $CLASS "$@"

http://git-wip-us.apache.org/repos/asf/spark/blob/e16a8e7d/bin/compute-classpath.sh
--
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index 16b794a..15c6779 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -23,9 +23,9 @@
 SCALA_VERSION=2.10
 
 # Figure out where Spark is installed
-FWDIR="$(cd `dirname $0`/..; pwd)"
+FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
 
-. $FWDIR/bin/load-spark-env.sh
+. "$FWDIR"/bin/load-spark-env.sh
 
 # Build up classpath
 CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf"
@@ -63,7 +63,7 @@ else
   assembly_folder="$ASSEMBLY_DIR"
 fi
 
-num_jars=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc 
-l)
+num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc 
-l)"
 if [ "$num_jars" -eq "0" ]; then
   echo "Failed to find Spark assembly in $assembly_folder"
   echo "You need to build Spark before running this program."
@@ -77,7 +77,7 @@ if [ "$num_jars" -gt "1" ]; then
   exit 1
 fi
 
-ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)
+ASSEMBLY_JAR="$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)"
 
 # Verify that versions of java used to build the jars and run Spark are 
compatible
 jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
@@ -103,8 +103,8 @@ else
   datanucleus_dir="$FWDIR"/lib_managed/jars
 fi
 
-datanucleus_jars=$(find "$datanucleus_dir" 2>/dev/null | grep 
"datanucleus-.*\\.jar")
-datanucleus_jars=$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)
+datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep 
"datanucleus-.*\\.jar")"
+datanucleus_jars="$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)"
 
 if [ -n "$datanucleus_jars" ]; then
   hive_files=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 
2>/dev/null)

http://git-wip

[1/2] [SPARK-3086] [SPARK-3043] [SPARK-3156] [mllib] DecisionTree aggregation improvements

2014-09-08 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 0d1cc4ae4 -> 711356b42


http://git-wip-us.apache.org/repos/asf/spark/blob/711356b4/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/DTStatsAggregator.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/DTStatsAggregator.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/DTStatsAggregator.scala
new file mode 100644
index 000..866d85a
--- /dev/null
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/DTStatsAggregator.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.tree.impl
+
+import org.apache.spark.mllib.tree.impurity._
+
+/**
+ * DecisionTree statistics aggregator.
+ * This holds a flat array of statistics for a set of (nodes, features, bins)
+ * and helps with indexing.
+ */
+private[tree] class DTStatsAggregator(
+val metadata: DecisionTreeMetadata,
+val numNodes: Int) extends Serializable {
+
+  /**
+   * [[ImpurityAggregator]] instance specifying the impurity type.
+   */
+  val impurityAggregator: ImpurityAggregator = metadata.impurity match {
+case Gini => new GiniAggregator(metadata.numClasses)
+case Entropy => new EntropyAggregator(metadata.numClasses)
+case Variance => new VarianceAggregator()
+case _ => throw new IllegalArgumentException(s"Bad impurity parameter: 
${metadata.impurity}")
+  }
+
+  /**
+   * Number of elements (Double values) used for the sufficient statistics of 
each bin.
+   */
+  val statsSize: Int = impurityAggregator.statsSize
+
+  val numFeatures: Int = metadata.numFeatures
+
+  /**
+   * Number of bins for each feature.  This is indexed by the feature index.
+   */
+  val numBins: Array[Int] = metadata.numBins
+
+  /**
+   * Number of splits for the given feature.
+   */
+  def numSplits(featureIndex: Int): Int = metadata.numSplits(featureIndex)
+
+  /**
+   * Indicator for each feature of whether that feature is an unordered 
feature.
+   * TODO: Is Array[Boolean] any faster?
+   */
+  def isUnordered(featureIndex: Int): Boolean = 
metadata.isUnordered(featureIndex)
+
+  /**
+   * Offset for each feature for calculating indices into the [[allStats]] 
array.
+   */
+  private val featureOffsets: Array[Int] = {
+def featureOffsetsCalc(total: Int, featureIndex: Int): Int = {
+  if (isUnordered(featureIndex)) {
+total + 2 * numBins(featureIndex)
+  } else {
+total + numBins(featureIndex)
+  }
+}
+Range(0, numFeatures).scanLeft(0)(featureOffsetsCalc).map(statsSize * 
_).toArray
+  }
+
+  /**
+   * Number of elements for each node, corresponding to stride between nodes 
in [[allStats]].
+   */
+  private val nodeStride: Int = featureOffsets.last
+
+  /**
+   * Total number of elements stored in this aggregator.
+   */
+  val allStatsSize: Int = numNodes * nodeStride
+
+  /**
+   * Flat array of elements.
+   * Index for start of stats for a (node, feature, bin) is:
+   *   index = nodeIndex * nodeStride + featureOffsets(featureIndex) + 
binIndex * statsSize
+   * Note: For unordered features, the left child stats have binIndex in [0, 
numBins(featureIndex))
+   *   and the right child stats in [numBins(featureIndex), 2 * 
numBins(featureIndex))
+   */
+  val allStats: Array[Double] = new Array[Double](allStatsSize)
+
+  /**
+   * Get an [[ImpurityCalculator]] for a given (node, feature, bin).
+   * @param nodeFeatureOffset  For ordered features, this is a pre-computed 
(node, feature) offset
+   *   from [[getNodeFeatureOffset]].
+   *   For unordered features, this is a pre-computed
+   *   (node, feature, left/right child) offset from
+   *   [[getLeftRightNodeFeatureOffsets]].
+   */
+  def getImpurityCalculator(nodeFeatureOffset: Int, binIndex: Int): 
ImpurityCalculator = {
+impurityAggregator.getCalculator(allStats, nodeFeatureOffset + binIndex * 
statsSize)
+  }
+
+  /**
+   * Update the stats for a given (node, feature, bin) for ordered features, 
using the given label.
+   */
+  def update(nodeIndex: Int, featureIndex

[2/2] git commit: [SPARK-3086] [SPARK-3043] [SPARK-3156] [mllib] DecisionTree aggregation improvements

2014-09-08 Thread meng
[SPARK-3086] [SPARK-3043] [SPARK-3156] [mllib]  DecisionTree aggregation 
improvements

Summary:
1. Variable numBins for each feature [SPARK-3043]
2. Reduced data reshaping in aggregation [SPARK-3043]
3. Choose ordering for ordered categorical features adaptively [SPARK-3156]
4. Changed nodes to use 1-indexing [SPARK-3086]
5. Small clean-ups

Note: This PR looks bigger than it is since I moved several functions from 
inside findBestSplitsPerGroup to outside of it (to make it clear what was being 
serialized in the aggregation).

Speedups: This update helps most when many features use few bins but a few 
features use many bins.  Some example results on speedups with 2M examples, 
3.5K features (15-worker EC2 cluster):
* Example where old code was reasonably efficient (1/2 continuous, 1/4 binary, 
1/4 20-category): 164.813 --> 116.491 sec
* Example where old code wasted many bins (1/10 continuous, 81/100 binary, 
9/100 20-category): 128.701 --> 39.334 sec

Details:

(1) Variable numBins for each feature [SPARK-3043]

DecisionTreeMetadata now computes a variable numBins for each feature.  It also 
tracks numSplits.

(2) Reduced data reshaping in aggregation [SPARK-3043]

Added DTStatsAggregator, a wrapper around the aggregate statistics array for 
easy but efficient indexing.
* Added ImpurityAggregator and ImpurityCalculator classes, to make DecisionTree 
code more oblivious to the type of impurity.
* Design note: I originally tried creating Impurity classes which stored data 
and storing the aggregates in an Array[Array[Array[Impurity]]].  However, this 
led to significant slowdowns, perhaps because of overhead in creating so many 
objects.

The aggregate statistics are never reshaped, and cumulative sums are computed 
in-place.

Updated the layout of aggregation functions.  The update simplifies things by 
(1) dividing features into ordered/unordered (instead of 
ordered/unordered/continuous) and (2) making use of the DTStatsAggregator for 
indexing.
For this update, the following functions were refactored:
* updateBinForOrderedFeature
* updateBinForUnorderedFeature
* binaryOrNotCategoricalBinSeqOp
* multiclassWithCategoricalBinSeqOp
* regressionBinSeqOp
The above 5 functions were replaced with:
* orderedBinSeqOp
* someUnorderedBinSeqOp

Other changes:
* calculateGainForSplit now treats all feature types the same way.
* Eliminated extractLeftRightNodeAggregates.

(3) Choose ordering for ordered categorical features adaptively [SPARK-3156]

Updated binsToBestSplit():
* This now computes cumulative sums of stats for ordered features.
* For ordered categorical features, it chooses an ordering for categories. 
(This uses to be done by findSplitsBins.)
* Uses iterators to shorten code and avoid building an 
Array[Array[InformationGainStats]].

Side effects:
* In findSplitsBins: A sample of the data is only taken for data with 
continuous features.  It is not needed for data with only categorical features.
* In findSplitsBins: splits and bins are no longer pre-computed for ordered 
categorical features since they are not needed.
* TreePoint binning is simpler for categorical features.

(4) Changed nodes to use 1-indexing [SPARK-3086]

Nodes used to be indexed from 0.  Now they are indexed from 1.
Node indexing functions are now collected in object Node (Node.scala).

(5) Small clean-ups

Eliminated functions extractNodeInfo() and extractInfoForLowerLevels() to 
reduce duplicate code.
Eliminated InvalidBinIndex since it is no longer used.

CC: mengxr  manishamde  Please let me know if you have thoughts on 
this—thanks!

Author: Joseph K. Bradley 

Closes #2125 from jkbradley/dt-opt3alt and squashes the following commits:

42c192a [Joseph K. Bradley] Merge branch 'rfs' into dt-opt3alt
d3cc46b [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into 
dt-opt3alt
00e4404 [Joseph K. Bradley] optimization for TreePoint construction 
(pre-computing featureArity and isUnordered as arrays)
425716c [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into 
rfs
a2acea5 [Joseph K. Bradley] Small optimizations based on profiling
aa4e4df [Joseph K. Bradley] Updated DTStatsAggregator with bug fix (nodeString 
should not be multiplied by statsSize)
4651154 [Joseph K. Bradley] Changed numBins semantics for unordered features. * 
Before: numBins = numSplits = (1 << k - 1) - 1 * Now: numBins = 2 * numSplits = 
2 * [(1 << k - 1) - 1] * This also involved changing the semantics of: ** 
DecisionTreeMetadata.numUnorderedBins()
1e3b1c7 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into 
dt-opt3alt
1485fcc [Joseph K. Bradley] Made some DecisionTree methods private.
92f934f [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into 
dt-opt3alt
e676da1 [Joseph K. Bradley] Updated documentation for DecisionTree
37ca845 [Joseph K. Bradley] Fixed problem with how DecisionTree handles ordered 
categorical features.
105f8ab [Joseph K. Bradley] Removed commented-out getEmptyBinA

git commit: [HOTFIX] A left over version change. It should make mima happy.

2014-09-08 Thread prashant
Repository: spark
Updated Branches:
  refs/heads/master eddfeddac -> 0d1cc4ae4


[HOTFIX] A left over version change. It should make mima happy.

Author: Prashant Sharma 

Closes #2317 from ScrapCodes/hotfix and squashes the following commits:

b6472d4 [Prashant Sharma] [HOTFIX] for hotfixes, a left over version change.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d1cc4ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d1cc4ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d1cc4ae

Branch: refs/heads/master
Commit: 0d1cc4ae42e1f73538dd8b9b1880ca9e5b124108
Parents: eddfedd
Author: Prashant Sharma 
Authored: Mon Sep 8 14:32:53 2014 +0530
Committer: Prashant Sharma 
Committed: Mon Sep 8 14:32:53 2014 +0530

--
 project/SparkBuild.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0d1cc4ae/project/SparkBuild.scala
--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a26c2c9..45f6d29 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -184,7 +184,7 @@ object OldDeps {
 
   def versionArtifact(id: String): Option[sbt.ModuleID] = {
 val fullId = id + "_2.10"
-Some("org.apache.spark" % fullId % "1.0.0")
+Some("org.apache.spark" % fullId % "1.1.0")
   }
 
   def oldDepsSettings() = Defaults.defaultSettings ++ Seq(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org