git commit: [SPARK-1720][SPARK-1719] use LD_LIBRARY_PATH instead of -Djava.library.path

2014-10-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 123425807 - cd739bd75


[SPARK-1720][SPARK-1719] use LD_LIBRARY_PATH instead of -Djava.library.path

- [X] Standalone
- [X] YARN
- [X] Mesos
- [X]  Mac OS X
- [X] Linux
- [ ]  Windows

This is another implementation about #1031

Author: GuoQiang Li wi...@qq.com

Closes #2711 from witgo/SPARK-1719 and squashes the following commits:

c7b26f6 [GuoQiang Li] review commits
4488e41 [GuoQiang Li] Refactoring CommandUtils
a444094 [GuoQiang Li] review commits
40c0b4a [GuoQiang Li] Add buildLocalCommand method
c1a0ddd [GuoQiang Li] fix comments
156ce88 [GuoQiang Li] review commit
38aa377 [GuoQiang Li] Refactor CommandUtils.scala
4269e00 [GuoQiang Li] Refactor SparkSubmitDriverBootstrapper.scala
7a1d634 [GuoQiang Li] use LD_LIBRARY_PATH instead of -Djava.library.path


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd739bd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd739bd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd739bd7

Branch: refs/heads/master
Commit: cd739bd756875bd52e9bd8ae801e0ae10a1f6937
Parents: 1234258
Author: GuoQiang Li wi...@qq.com
Authored: Wed Oct 29 23:02:58 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Wed Oct 29 23:02:58 2014 -0700

--
 bin/spark-class |  6 +-
 .../main/scala/org/apache/spark/SparkConf.scala | 13 
 .../deploy/SparkSubmitDriverBootstrapper.scala  | 17 ++---
 .../spark/deploy/worker/CommandUtils.scala  | 68 
 .../spark/deploy/worker/DriverRunner.scala  | 23 ++-
 .../spark/deploy/worker/ExecutorRunner.scala| 26 +++-
 .../mesos/CoarseMesosSchedulerBackend.scala | 22 ---
 .../cluster/mesos/MesosSchedulerBackend.scala   | 18 +++---
 .../scala/org/apache/spark/util/Utils.scala | 42 +++-
 .../apache/spark/deploy/CommandUtilsSuite.scala | 37 +++
 .../deploy/worker/ExecutorRunnerTest.scala  |  5 +-
 .../apache/spark/deploy/yarn/ClientBase.scala   | 14 +++-
 .../deploy/yarn/ExecutorRunnableUtil.scala  | 11 +++-
 13 files changed, 221 insertions(+), 81 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cd739bd7/bin/spark-class
--
diff --git a/bin/spark-class b/bin/spark-class
index 91d858b..925367b 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -81,7 +81,11 @@ case $1 in
 OUR_JAVA_OPTS=$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS
 OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
 if [ -n $SPARK_SUBMIT_LIBRARY_PATH ]; then
-  OUR_JAVA_OPTS=$OUR_JAVA_OPTS 
-Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH
+  if [[ $OSTYPE == darwin* ]]; then
+   export DYLD_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:$DYLD_LIBRARY_PATH
+  else
+   export LD_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:$LD_LIBRARY_PATH
+  fi
 fi
 if [ -n $SPARK_SUBMIT_DRIVER_MEMORY ]; then
   OUR_JAVA_MEM=$SPARK_SUBMIT_DRIVER_MEMORY

http://git-wip-us.apache.org/repos/asf/spark/blob/cd739bd7/core/src/main/scala/org/apache/spark/SparkConf.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index dbbcc23..ad0a901 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -244,6 +244,19 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
 val executorClasspathKey = spark.executor.extraClassPath
 val driverOptsKey = spark.driver.extraJavaOptions
 val driverClassPathKey = spark.driver.extraClassPath
+val driverLibraryPathKey = spark.driver.extraLibraryPath
+
+// Used by Yarn in 1.1 and before
+sys.props.get(spark.driver.libraryPath).foreach { value =
+  val warning =
+s
+  |spark.driver.libraryPath was detected (set to '$value').
+  |This is deprecated in Spark 1.2+.
+  |
+  |Please instead use: $driverLibraryPathKey
+.stripMargin
+  logWarning(warning)
+}
 
 // Validate spark.executor.extraJavaOptions
 settings.get(executorOptsKey).map { javaOpts =

http://git-wip-us.apache.org/repos/asf/spark/blob/cd739bd7/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index 0125330..2b894a7 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ 

git commit: [SPARK-4102] Remove unused ShuffleReader.stop() method.

2014-10-30 Thread kayousterhout
Repository: spark
Updated Branches:
  refs/heads/master cd739bd75 - 6db315746


[SPARK-4102] Remove unused ShuffleReader.stop() method.

This method is not implemented by the only subclass
(HashShuffleReader), nor is it ever called. While the
use of Scala's fancy ??? was pretty exciting, the method's
existence can only lead to confusion and it therefore should
be deleted.

mateiz was there a reason for adding this that I'm
missing?

Author: Kay Ousterhout kayousterh...@gmail.com

Closes #2966 from kayousterhout/SPARK-4102 and squashes the following commits:

532c564 [Kay Ousterhout] Added back commented-out method, as per Matei's request
904655e [Kay Ousterhout] [SPARK-4102] Remove unused ShuffleReader.stop() method.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6db31574
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6db31574
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6db31574

Branch: refs/heads/master
Commit: 6db3157464e36f7a572ada5f1e7f88730aa23dbd
Parents: cd739bd
Author: Kay Ousterhout kayousterh...@gmail.com
Authored: Wed Oct 29 23:52:46 2014 -0700
Committer: Kay Ousterhout kayousterh...@gmail.com
Committed: Wed Oct 29 23:52:46 2014 -0700

--
 .../main/scala/org/apache/spark/shuffle/ShuffleReader.scala  | 8 ++--
 .../org/apache/spark/shuffle/hash/HashShuffleReader.scala| 3 ---
 2 files changed, 6 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6db31574/core/src/main/scala/org/apache/spark/shuffle/ShuffleReader.scala
--
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleReader.scala
index b30e366..292e483 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleReader.scala
@@ -24,6 +24,10 @@ private[spark] trait ShuffleReader[K, C] {
   /** Read the combined key-values for this reduce task */
   def read(): Iterator[Product2[K, C]]
 
-  /** Close this reader */
-  def stop(): Unit
+  /**
+   * Close this reader.
+   * TODO: Add this back when we make the ShuffleReader a developer API that 
others can implement
+   * (at which point this will likely be necessary).
+   */
+  // def stop(): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6db31574/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
index 88a5f1e..5baf45d 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
@@ -66,7 +66,4 @@ private[spark] class HashShuffleReader[K, C](
 aggregatedIter
 }
   }
-
-  /** Close this reader */
-  override def stop(): Unit = ???
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4130][MLlib] Fixing libSVM parser bug with extra whitespace

2014-10-30 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 6db315746 - c7ad08520


[SPARK-4130][MLlib] Fixing libSVM parser bug with extra whitespace

This simple patch filters out extra whitespace entries.

Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com
Author: Joey joseph.e.gonza...@gmail.com

Closes #2996 from jegonzal/loadLibSVM and squashes the following commits:

e0227ab [Joey] improving readability
e028e84 [Joseph E. Gonzalez] fixing whitespace bug in loadLibSVMFile when 
parsing libSVM files


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7ad0852
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7ad0852
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7ad0852

Branch: refs/heads/master
Commit: c7ad0852084dc28f3ebc144adfd4928b23f1c8ea
Parents: 6db3157
Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com
Authored: Thu Oct 30 00:05:57 2014 -0700
Committer: Xiangrui Meng m...@databricks.com
Committed: Thu Oct 30 00:05:57 2014 -0700

--
 mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c7ad0852/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index dce0adf..b88e08b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -76,7 +76,7 @@ object MLUtils {
   .map { line =
 val items = line.split(' ')
 val label = items.head.toDouble
-val (indices, values) = items.tail.map { item =
+val (indices, values) = items.tail.filter(_.nonEmpty).map { item =
   val indexAndValue = item.split(':')
   val index = indexAndValue(0).toInt - 1 // Convert 1-based indices to 
0-based.
   val value = indexAndValue(1).toDouble


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: SPARK-4111 [MLlib] add regression metrics

2014-10-30 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master c7ad08520 - d9327192e


SPARK-4111 [MLlib] add regression metrics

Add RegressionMetrics.scala as regression metrics used for evaluation and 
corresponding test case RegressionMetricsSuite.scala.

Author: Yanbo Liang yanboha...@gmail.com
Author: liangyanbo liangya...@meituan.com

Closes #2978 from yanbohappy/regression_metrics and squashes the following 
commits:

730d0a9 [Yanbo Liang] more clearly annotation
3d0bec1 [Yanbo Liang] rename and keep code style
a8ad3e3 [Yanbo Liang] simplify code for keeping style
d454909 [Yanbo Liang] rename parameter and function names, delete unused 
columns, add reference
2e56282 [liangyanbo] rename r2_score() and remove unused column
43bb12b [liangyanbo] add regression metrics


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9327192
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9327192
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9327192

Branch: refs/heads/master
Commit: d9327192eee7f18e92381c59a42b0e1770f1f8f4
Parents: c7ad085
Author: Yanbo Liang yanboha...@gmail.com
Authored: Thu Oct 30 12:00:56 2014 -0700
Committer: Xiangrui Meng m...@databricks.com
Committed: Thu Oct 30 12:00:56 2014 -0700

--
 .../mllib/evaluation/RegressionMetrics.scala| 89 
 .../evaluation/RegressionMetricsSuite.scala | 52 
 2 files changed, 141 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d9327192/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
new file mode 100644
index 000..693117d
--- /dev/null
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.evaluation
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.Logging
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, 
MultivariateOnlineSummarizer}
+
+/**
+ * :: Experimental ::
+ * Evaluator for regression.
+ *
+ * @param predictionAndObservations an RDD of (prediction, observation) pairs.
+ */
+@Experimental
+class RegressionMetrics(predictionAndObservations: RDD[(Double, Double)]) 
extends Logging {
+
+  /**
+   * Use MultivariateOnlineSummarizer to calculate summary statistics of 
observations and errors.
+   */
+  private lazy val summary: MultivariateStatisticalSummary = {
+val summary: MultivariateStatisticalSummary = 
predictionAndObservations.map {
+  case (prediction, observation) = Vectors.dense(observation, observation 
- prediction)
+}.aggregate(new MultivariateOnlineSummarizer())(
+(summary, v) = summary.add(v),
+(sum1, sum2) = sum1.merge(sum2)
+  )
+summary
+  }
+
+  /**
+   * Returns the explained variance regression score.
+   * explainedVariance = 1 - variance(y - \hat{y}) / variance(y)
+   * Reference: [[http://en.wikipedia.org/wiki/Explained_variation]]
+   */
+  def explainedVariance: Double = {
+1 - summary.variance(1) / summary.variance(0)
+  }
+
+  /**
+   * Returns the mean absolute error, which is a risk function corresponding 
to the
+   * expected value of the absolute error loss or l1-norm loss.
+   */
+  def meanAbsoluteError: Double = {
+summary.normL1(1) / summary.count
+  }
+
+  /**
+   * Returns the mean squared error, which is a risk function corresponding to 
the
+   * expected value of the squared error loss or quadratic loss.
+   */
+  def meanSquaredError: Double = {
+val rmse = summary.normL2(1) / math.sqrt(summary.count)
+rmse * rmse
+  }
+
+  /**
+   * Returns the root mean squared error, which is defined as the square root 
of
+   * 

git commit: [SPARK-4028][Streaming] ReceivedBlockHandler interface to abstract the functionality of storage of received data

2014-10-30 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master d9327192e - 234de9232


[SPARK-4028][Streaming] ReceivedBlockHandler interface to abstract the 
functionality of storage of received data

As part of the initiative to prevent data loss on streaming driver failure, 
this JIRA tracks the subtask of implementing a ReceivedBlockHandler, that 
abstracts the functionality of storage of received data blocks. The default 
implementation will maintain the current behavior of storing the data into 
BlockManager. The optional implementation will store the data to both 
BlockManager as well as a write ahead log.

Author: Tathagata Das tathagata.das1...@gmail.com

Closes #2940 from tdas/driver-ha-rbh and squashes the following commits:

78a4aaa [Tathagata Das] Fixed bug causing test failures.
f192f47 [Tathagata Das] Fixed import order.
df5f320 [Tathagata Das] Updated code to use ReceivedBlockStoreResult as the 
return type for handler's storeBlock
33c30c9 [Tathagata Das] Added license, and organized imports.
2f025b3 [Tathagata Das] Updates based on PR comments.
18aec1e [Tathagata Das] Moved ReceivedBlockInfo back into 
spark.streaming.scheduler package
95a4987 [Tathagata Das] Added ReceivedBlockHandler and its associated tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/234de923
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/234de923
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/234de923

Branch: refs/heads/master
Commit: 234de9232bcfa212317a8073c4a82c3863b36b14
Parents: d932719
Author: Tathagata Das tathagata.das1...@gmail.com
Authored: Thu Oct 30 14:51:13 2014 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Thu Oct 30 14:51:13 2014 -0700

--
 .../dstream/ReceiverInputDStream.scala  |   7 +-
 .../streaming/receiver/ReceivedBlock.scala  |  35 +++
 .../receiver/ReceivedBlockHandler.scala | 193 ++
 .../receiver/ReceiverSupervisorImpl.scala   |  88 ---
 .../spark/streaming/scheduler/BatchInfo.scala   |   2 +-
 .../spark/streaming/scheduler/JobSet.scala  |   3 +-
 .../streaming/scheduler/ReceivedBlockInfo.scala |  28 ++
 .../streaming/scheduler/ReceiverTracker.scala   |  24 +-
 .../util/WriteAheadLogRandomReader.scala|   1 -
 .../streaming/ReceivedBlockHandlerSuite.scala   | 258 +++
 .../streaming/util/WriteAheadLogSuite.scala |  34 ++-
 11 files changed, 603 insertions(+), 70 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 391e409..bb47d37 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -23,8 +23,9 @@ import scala.reflect.ClassTag
 import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.BlockId
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, 
BlockManagerBasedStoreResult, Receiver}
 import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+import org.apache.spark.SparkException
 
 /**
  * Abstract class for defining any 
[[org.apache.spark.streaming.dstream.InputDStream]]
@@ -65,10 +66,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient 
ssc_ : StreamingCont
 if (validTime = graph.startTime) {
   val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
   receivedBlockInfo(validTime) = blockInfo
-  val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
+  val blockIds = blockInfo.map { 
_.blockStoreResult.blockId.asInstanceOf[BlockId] }
   Some(new BlockRDD[T](ssc.sc, blockIds))
 } else {
-  Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
+  Some(new BlockRDD[T](ssc.sc, Array.empty))
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
new file mode 100644
index 000..47968af
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed 

git commit: [SPARK-4027][Streaming] WriteAheadLogBackedBlockRDD to read received either from BlockManager or WAL in HDFS

2014-10-30 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 234de9232 - fb1fbca20


[SPARK-4027][Streaming] WriteAheadLogBackedBlockRDD to read received either 
from BlockManager or WAL in HDFS

As part of the initiative of preventing data loss on streaming driver failure, 
this sub-task implements a BlockRDD that is backed by HDFS. This BlockRDD can 
either read data from the Spark's BlockManager, or read the data from 
file-segments in write ahead log in HDFS.

Most of this code has been written by @harishreedharan

Author: Tathagata Das tathagata.das1...@gmail.com
Author: Hari Shreedharan hshreedha...@apache.org

Closes #2931 from tdas/driver-ha-rdd and squashes the following commits:

209e49c [Tathagata Das] Better fix to style issue.
4a5866f [Tathagata Das] Addressed one more comment.
ed5fbf0 [Tathagata Das] Minor updates.
b0a18b1 [Tathagata Das] Fixed import order.
20aa7c6 [Tathagata Das] Fixed more line length issues.
29aa099 [Tathagata Das] Fixed line length issues.
9e47b5b [Tathagata Das] Renamed class, simplified+added unit tests.
6e1bfb8 [Tathagata Das] Tweaks testuite to create spark contxt lazily to 
prevent contxt leaks.
9c86a61 [Tathagata Das] Merge pull request #22 from 
harishreedharan/driver-ha-rdd
2878c38 [Hari Shreedharan] Shutdown spark context after tests. Formatting/minor 
fixes
c709f2f [Tathagata Das] Merge pull request #21 from 
harishreedharan/driver-ha-rdd
5cce16f [Hari Shreedharan] Make sure getBlockLocations uses offset and length 
to find the blocks on HDFS
eadde56 [Tathagata Das] Transferred HDFSBackedBlockRDD for the 
driver-ha-working branch


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb1fbca2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb1fbca2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb1fbca2

Branch: refs/heads/master
Commit: fb1fbca204250840ffdbc0fcbf80b8dfeebf9edb
Parents: 234de92
Author: Tathagata Das tathagata.das1...@gmail.com
Authored: Thu Oct 30 15:17:02 2014 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Thu Oct 30 15:17:02 2014 -0700

--
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |   4 +
 .../rdd/WriteAheadLogBackedBlockRDD.scala   | 125 +++
 .../apache/spark/streaming/util/HdfsUtils.scala |   8 +-
 .../rdd/WriteAheadLogBackedBlockRDDSuite.scala  | 151 +++
 4 files changed, 285 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fb1fbca2/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 2673ec2..fffa191 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -84,5 +84,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, 
@transient val blockIds
 Attempted to use %s after its blocks have been 
removed!.format(toString))
 }
   }
+
+  protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
+locations_
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fb1fbca2/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
new file mode 100644
index 000..23295bf
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark._
+import org.apache.spark.rdd.BlockRDD
+import org.apache.spark.storage.{BlockId, 

git commit: [SPARK-4078] New FsPermission instance w/o FsPermission.createImmutable in eventlog

2014-10-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master fb1fbca20 - 9142c9b80


[SPARK-4078] New FsPermission instance w/o FsPermission.createImmutable in 
eventlog

By default, Spark builds its package against Hadoop 1.0.4 version. In that 
version, it has some FsPermission bug (see [HADOOP-7629] 
(https://issues.apache.org/jira/browse/HADOOP-7629) by Todd Lipcon). This bug 
got fixed since 1.1 version. By using that FsPermission.createImmutable() API, 
end-user may see some RPC exception like below (if turn on eventlog over HDFS). 
 Here proposes a quick fix to avoid certain exception for all hadoop versions.
```
Exception in thread main java.io.IOException: Call to sr484/10.1.2.84:54310 
failed on local exception: java.io.EOFException
at org.apache.hadoop.ipc.Client.wrapException(Client.java:1150)
at org.apache.hadoop.ipc.Client.call(Client.java:1118)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)
at $Proxy6.setPermission(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)
at $Proxy6.setPermission(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.setPermission(DFSClient.java:1285)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.setPermission(DistributedFileSystem.java:572)
at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:138)
at org.apache.spark.util.FileLogger.start(FileLogger.scala:115)
at 
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
at org.apache.spark.SparkContext.init(SparkContext.scala:324)
```

Author: Grace jie.hu...@intel.com

Closes #2892 from GraceH/eventlog-rpc and squashes the following commits:

58ea038 [Grace] new FsPermission Instance w/o FsPermission.createImmutable


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9142c9b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9142c9b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9142c9b8

Branch: refs/heads/master
Commit: 9142c9b80bfe12e0be8a2b795bf52e403b2c5f30
Parents: fb1fbca
Author: Grace jie.hu...@intel.com
Authored: Thu Oct 30 15:27:32 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Oct 30 15:27:32 2014 -0700

--
 .../scala/org/apache/spark/scheduler/EventLoggingListener.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9142c9b8/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 100c9ba..597dbc8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -142,7 +142,7 @@ private[spark] object EventLoggingListener extends Logging {
   val SPARK_VERSION_PREFIX = SPARK_VERSION_
   val COMPRESSION_CODEC_PREFIX = COMPRESSION_CODEC_
   val APPLICATION_COMPLETE = APPLICATION_COMPLETE
-  val LOG_FILE_PERMISSIONS = 
FsPermission.createImmutable(Integer.parseInt(770, 8).toShort)
+  val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt(770, 
8).toShort)
 
   // A cache for compression codecs to avoid creating the same codec many times
   private val codecMap = new mutable.HashMap[String, CompressionCodec]


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-3319] [SPARK-3338] Resolve Spark submit config paths

2014-10-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 9142c9b80 - 24c512925


[SPARK-3319] [SPARK-3338] Resolve Spark submit config paths

The bulk of this PR is comprised of tests. All changes in functionality are 
made in `SparkSubmit.scala` (~20 lines).

**SPARK-3319.** There is currently a divergence in behavior when the user 
passes in additional jars through `--jars` and through setting `spark.jars` in 
the default properties file. The former will happily resolve the paths (e.g. 
convert `my.jar` to `file:/absolute/path/to/my.jar`), while the latter does 
not. We should resolve paths consistently in both cases. This also applies to 
the following pairs of command line arguments and Spark configs:

- `--jars` ~ `spark.jars`
- `--files` ~ `spark.files` / `spark.yarn.dist.files`
- `--archives` ~ `spark.yarn.dist.archives`
- `--py-files` ~ `spark.submit.pyFiles`

**SPARK-3338.** This PR also fixes the following bug: if the user sets 
`spark.submit.pyFiles` in his/her properties file, it does not actually get 
picked up even if `--py-files` is not set. This is simply because the config is 
overridden by an empty string.

Author: Andrew Or andrewo...@gmail.com
Author: Andrew Or and...@databricks.com

Closes #2232 from andrewor14/resolve-config-paths and squashes the following 
commits:

fff2869 [Andrew Or] Add spark.yarn.jar
da3a1c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
resolve-config-paths
f0fae64 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
resolve-config-paths
05e03d6 [Andrew Or] Add tests for resolving both command line and config paths
460117e [Andrew Or] Resolve config paths properly
fe039d3 [Andrew Or] Beef up tests to test fixed-pointed-ness of 
Utils.resolveURI(s)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24c51292
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24c51292
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24c51292

Branch: refs/heads/master
Commit: 24c5129257ce6e3b734f168e860b714c2730b55f
Parents: 9142c9b
Author: Andrew Or andrewo...@gmail.com
Authored: Thu Oct 30 15:29:07 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Oct 30 15:29:07 2014 -0700

--
 .../org/apache/spark/deploy/SparkSubmit.scala   |  28 -
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 106 ++-
 .../org/apache/spark/util/UtilsSuite.scala  |  38 +--
 3 files changed, 158 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/24c51292/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index f97bf67..0379ade 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -158,8 +158,9 @@ object SparkSubmit {
 args.files = mergeFileLists(args.files, args.primaryResource)
   }
   args.files = mergeFileLists(args.files, args.pyFiles)
-  // Format python file paths properly before adding them to the PYTHONPATH
-  sysProps(spark.submit.pyFiles) = 
PythonRunner.formatPaths(args.pyFiles).mkString(,)
+  if (args.pyFiles != null) {
+sysProps(spark.submit.pyFiles) = args.pyFiles
+  }
 }
 
 // Special flag to avoid deprecation warnings at the client
@@ -284,6 +285,29 @@ object SparkSubmit {
   sysProps.getOrElseUpdate(k, v)
 }
 
+// Resolve paths in certain spark properties
+val pathConfigs = Seq(
+  spark.jars,
+  spark.files,
+  spark.yarn.jar,
+  spark.yarn.dist.files,
+  spark.yarn.dist.archives)
+pathConfigs.foreach { config =
+  // Replace old URIs with resolved URIs, if they exist
+  sysProps.get(config).foreach { oldValue =
+sysProps(config) = Utils.resolveURIs(oldValue)
+  }
+}
+
+// Resolve and format python file paths properly before adding them to the 
PYTHONPATH.
+// The resolving part is redundant in the case of --py-files, but 
necessary if the user
+// explicitly sets `spark.submit.pyFiles` in his/her default properties 
file.
+sysProps.get(spark.submit.pyFiles).foreach { pyFiles =
+  val resolvedPyFiles = Utils.resolveURIs(pyFiles)
+  val formattedPyFiles = 
PythonRunner.formatPaths(resolvedPyFiles).mkString(,)
+  sysProps(spark.submit.pyFiles) = formattedPyFiles
+}
+
 (childArgs, childClasspath, sysProps, childMainClass)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/24c51292/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

git commit: [SPARK-4138][SPARK-4139] Improve dynamic allocation settings

2014-10-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 24c512925 - 26f092d4e


[SPARK-4138][SPARK-4139] Improve dynamic allocation settings

This should be merged after #2746 (SPARK-3795).

**SPARK-4138**. If the user sets both the number of executors and 
`spark.dynamicAllocation.enabled`, we should throw an exception.

**SPARK-4139**. If the user sets `spark.dynamicAllocation.enabled`, we should 
use the max number of executors as the starting number of executors because the 
first job is likely to run immediately after application startup. If the latter 
is not set, throw an exception.

Author: Andrew Or and...@databricks.com

Closes #3002 from andrewor14/yarn-set-executors and squashes the following 
commits:

c528fce [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
yarn-set-executors
55d4699 [Andrew Or] Bug fix: `isDynamicAllocationEnabled` was always false
2b0ccec [Andrew Or] Start the number of executors at the max
022bfde [Andrew Or] Guard against incompatible settings of number of executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26f092d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26f092d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26f092d4

Branch: refs/heads/master
Commit: 26f092d4e32cc1f7e279646075eaf1e495395923
Parents: 24c5129
Author: Andrew Or and...@databricks.com
Authored: Thu Oct 30 15:31:23 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Oct 30 15:31:23 2014 -0700

--
 .../yarn/ApplicationMasterArguments.scala   |  3 +-
 .../spark/deploy/yarn/ClientArguments.scala | 30 +++-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  2 ++
 .../cluster/YarnClusterSchedulerBackend.scala   |  4 +--
 4 files changed, 29 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
--
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 5c54e34..104db4f 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import org.apache.spark.util.{MemoryParam, IntParam}
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import collection.mutable.ArrayBuffer
 
 class ApplicationMasterArguments(val args: Array[String]) {
@@ -26,7 +27,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var userArgs: Seq[String] = Seq[String]()
   var executorMemory = 1024
   var executorCores = 1
-  var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+  var numExecutors = DEFAULT_NUMBER_EXECUTORS
 
   parseArgs(args.toList)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
--
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index a12f82d..4d85945 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.yarn
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkConf
-import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are 
location aware !
 private[spark] class ClientArguments(args: Array[String], sparkConf: 
SparkConf) {
@@ -33,23 +33,25 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
   var userArgs: Seq[String] = Seq[String]()
   var executorMemory = 1024 // MB
   var executorCores = 1
-  var numExecutors = 2
+  var numExecutors = DEFAULT_NUMBER_EXECUTORS
   var amQueue = sparkConf.get(spark.yarn.queue, default)
   var amMemory: Int = 512 // MB
   var appName: String = Spark
   var priority = 0
 
-  parseArgs(args.toList)
-  loadEnvironmentArgs()
-
   // Additional memory to allocate to containers
   // For now, use driver's memory overhead as our AM container's memory 
overhead
-  val amMemoryOverhead = sparkConf.getInt(spark.yarn.driver.memoryOverhead, 

git commit: [Minor] A few typos in comments and log messages

2014-10-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 26f092d4e - 5231a3f22


[Minor] A few typos in comments and log messages

Author: Andrew Or andrewo...@gmail.com
Author: Andrew Or and...@databricks.com

Closes #3021 from andrewor14/typos and squashes the following commits:

daaf417 [Andrew Or] Merge branch 'master' of github.com:apache/spark into typos
4838ae4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into typos
026d426 [Andrew Or] Merge branch 'master' of github.com:andrewor14/spark into 
typos
a81ae8f [Andrew Or] Some typos


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5231a3f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5231a3f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5231a3f2

Branch: refs/heads/master
Commit: 5231a3f228b5482cba09ae23a9f68498eba03c88
Parents: 26f092d
Author: Andrew Or andrewo...@gmail.com
Authored: Thu Oct 30 15:32:11 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Oct 30 15:32:11 2014 -0700

--
 .../main/scala/org/apache/spark/scheduler/TaskResultGetter.scala | 2 +-
 .../main/scala/org/apache/spark/scheduler/TaskScheduler.scala| 4 ++--
 .../apache/spark/deploy/yarn/ApplicationMasterArguments.scala| 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5231a3f2/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 3f345ce..4b5be68 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -93,7 +93,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
   }
 } catch {
   case cnd: ClassNotFoundException =
-// Log an error but keep going here -- the task failed, so not 
catastropic if we can't
+// Log an error but keep going here -- the task failed, so not 
catastrophic if we can't
 // deserialize the reason.
 val loader = Utils.getContextOrSparkClassLoader
 logError(

http://git-wip-us.apache.org/repos/asf/spark/blob/5231a3f2/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index a129a43..f095915 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -23,7 +23,7 @@ import org.apache.spark.storage.BlockManagerId
 
 /**
  * Low-level task scheduler interface, currently implemented exclusively by 
TaskSchedulerImpl.
- * This interface allows plugging in different task schedulers. Each 
TaskScheduler schedulers tasks
+ * This interface allows plugging in different task schedulers. Each 
TaskScheduler schedules tasks
  * for a single SparkContext. These schedulers get sets of tasks submitted to 
them from the
  * DAGScheduler for each stage, and are responsible for sending the tasks to 
the cluster, running
  * them, retrying if there are failures, and mitigating stragglers. They 
return events to the
@@ -41,7 +41,7 @@ private[spark] trait TaskScheduler {
 
   // Invoked after system has successfully initialized (typically in spark 
context).
   // Yarn uses this to bootstrap allocation of resources based on preferred 
locations,
-  // wait for slave registerations, etc.
+  // wait for slave registrations, etc.
   def postStartHook() { }
 
   // Disconnect from the cluster.

http://git-wip-us.apache.org/repos/asf/spark/blob/5231a3f2/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
--
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 104db4f..8b32c76 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -82,7 +82,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
   |  --jar JAR_PATH   Path to your application's JAR file
   |  --class CLASS_NAME   Name of your application's main class
   |  --args ARGS  Arguments to 

git commit: [SPARK-4155] Consolidate usages of driver

2014-10-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 5231a3f22 - 9334d6996


[SPARK-4155] Consolidate usages of driver

We use \driver\ everywhere. Let's not do that.

Author: Andrew Or and...@databricks.com

Closes #3020 from andrewor14/consolidate-driver and squashes the following 
commits:

c1c2204 [Andrew Or] Just use driver for local executor ID
3d751e9 [Andrew Or] Consolidate usages of driver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9334d699
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9334d699
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9334d699

Branch: refs/heads/master
Commit: 9334d699671edd8f18370255017ad40c1d0340ee
Parents: 5231a3f
Author: Andrew Or and...@databricks.com
Authored: Thu Oct 30 15:32:46 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Oct 30 15:32:46 2014 -0700

--
 .../org/apache/spark/ExecutorAllocationManager.scala   |  2 +-
 .../src/main/scala/org/apache/spark/SparkContext.scala |  2 ++
 core/src/main/scala/org/apache/spark/SparkEnv.scala|  2 +-
 .../apache/spark/scheduler/local/LocalBackend.scala|  4 ++--
 .../org/apache/spark/storage/BlockManagerId.scala  |  3 ++-
 .../apache/spark/storage/StorageStatusListener.scala   | 13 ++---
 .../scala/org/apache/spark/ui/exec/ExecutorsTab.scala  |  6 ++
 .../spark/storage/BlockManagerReplicationSuite.scala   |  8 +---
 .../org/apache/spark/storage/BlockManagerSuite.scala   | 10 ++
 9 files changed, 23 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b2cf022..c11f1db 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -419,7 +419,7 @@ private[spark] class ExecutorAllocationManager(sc: 
SparkContext) extends Logging
 
 override def onBlockManagerAdded(blockManagerAdded: 
SparkListenerBlockManagerAdded): Unit = {
   val executorId = blockManagerAdded.blockManagerId.executorId
-  if (executorId != driver) {
+  if (executorId != SparkContext.DRIVER_IDENTIFIER) {
 allocationManager.onExecutorAdded(executorId)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 73668e8..6bfcd8c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1333,6 +1333,8 @@ object SparkContext extends Logging {
 
   private[spark] val SPARK_UNKNOWN_USER = unknown
 
+  private[spark] val DRIVER_IDENTIFIER = driver
+
   implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
 def addInPlace(t1: Double, t2: Double): Double = t1 + t2
 def zero(initialValue: Double) = 0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/SparkEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 6a6dfda..557d2f5 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -156,7 +156,7 @@ object SparkEnv extends Logging {
 assert(conf.contains(spark.driver.port), spark.driver.port is not set 
on the driver!)
 val hostname = conf.get(spark.driver.host)
 val port = conf.get(spark.driver.port).toInt
-create(conf, driver, hostname, port, true, isLocal, listenerBus)
+create(conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, true, 
isLocal, listenerBus)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 58b78f0..c026483 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 
 import 

git commit: Minor style hot fix after #2711

2014-10-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 9334d6996 - 849b43ec0


Minor style hot fix after #2711

I had planned to fix this when I merged it but I forgot to. witgo

Author: Andrew Or and...@databricks.com

Closes #3018 from andrewor14/command-utils-style and squashes the following 
commits:

c2959fb [Andrew Or] Style hot fix


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/849b43ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/849b43ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/849b43ec

Branch: refs/heads/master
Commit: 849b43ec0f9e4f2ef962a054eb78cd0fc94a142a
Parents: 9334d69
Author: Andrew Or and...@databricks.com
Authored: Thu Oct 30 15:33:34 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Oct 30 15:33:34 2014 -0700

--
 .../org/apache/spark/deploy/worker/CommandUtils.scala   | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/849b43ec/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index aba2e20..28e9662 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -37,12 +37,12 @@ object CommandUtils extends Logging {
* The `env` argument is exposed for testing.
*/
   def buildProcessBuilder(
-command: Command,
-memory: Int,
-sparkHome: String,
-substituteArguments: String = String,
-classPaths: Seq[String] = Seq[String](),
-env: Map[String, String] = sys.env): ProcessBuilder = {
+  command: Command,
+  memory: Int,
+  sparkHome: String,
+  substituteArguments: String = String,
+  classPaths: Seq[String] = Seq[String](),
+  env: Map[String, String] = sys.env): ProcessBuilder = {
 val localCommand = buildLocalCommand(command, substituteArguments, 
classPaths, env)
 val commandSeq = buildCommandSeq(localCommand, memory, sparkHome)
 val builder = new ProcessBuilder(commandSeq: _*)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4153][WebUI] Update the sort keys for HistoryPage

2014-10-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 849b43ec0 - d34505783


[SPARK-4153][WebUI] Update the sort keys for HistoryPage

Sort Started, Completed, Duration and Last Updated by time.

Author: zsxwing zsxw...@gmail.com

Closes #3014 from zsxwing/SPARK-4153 and squashes the following commits:

ec8b9ad [zsxwing] Sort Started, Completed, Duration and Last Updated by 
time


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3450578
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3450578
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3450578

Branch: refs/heads/master
Commit: d3450578357d6f7598243ee2ab11c338085ad9c1
Parents: 849b43e
Author: zsxwing zsxw...@gmail.com
Authored: Thu Oct 30 15:33:56 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Oct 30 15:33:56 2014 -0700

--
 .../scala/org/apache/spark/deploy/history/HistoryPage.scala  | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d3450578/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index d25c291..0e249e5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -84,11 +84,11 @@ private[spark] class HistoryPage(parent: HistoryServer) 
extends WebUIPage() {
 tr
   tda href={uiAddress}{info.id}/a/td
   td{info.name}/td
-  td{startTime}/td
-  td{endTime}/td
-  td{duration}/td
+  td sorttable_customkey={info.startTime.toString}{startTime}/td
+  td sorttable_customkey={info.endTime.toString}{endTime}/td
+  td sorttable_customkey={(info.endTime - 
info.startTime).toString}{duration}/td
   td{info.sparkUser}/td
-  td{lastUpdated}/td
+  td sorttable_customkey={info.lastUpdated.toString}{lastUpdated}/td
 /tr
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-3661] Respect spark.*.memory in cluster mode

2014-10-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master d34505783 - 2f5454381


[SPARK-3661] Respect spark.*.memory in cluster mode

This also includes minor re-organization of the code. Tested locally in both 
client and deploy modes.

Author: Andrew Or and...@databricks.com
Author: Andrew Or andrewo...@gmail.com

Closes #2697 from andrewor14/memory-cluster-mode and squashes the following 
commits:

01d78bc [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
memory-cluster-mode
ccd468b [Andrew Or] Add some comments per Patrick
c956577 [Andrew Or] Tweak wording
2b4afa0 [Andrew Or] Unused import
47a5a88 [Andrew Or] Correct Spark properties precedence order
bf64717 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
memory-cluster-mode
dd452d0 [Andrew Or] Respect spark.*.memory in cluster mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f545438
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f545438
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f545438

Branch: refs/heads/master
Commit: 2f54543815c0905dc958d444ad638c23a29507c6
Parents: d345057
Author: Andrew Or and...@databricks.com
Authored: Thu Oct 30 15:44:29 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Oct 30 15:44:29 2014 -0700

--
 .../org/apache/spark/deploy/SparkSubmit.scala   |  8 +--
 .../spark/deploy/SparkSubmitArguments.scala | 74 
 2 files changed, 45 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2f545438/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 0379ade..b43e68e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -274,17 +274,11 @@ object SparkSubmit {
   }
 }
 
-// Properties given with --conf are superceded by other options, but take 
precedence over
-// properties in the defaults file.
+// Load any properties specified through --conf and the default properties 
file
 for ((k, v) - args.sparkProperties) {
   sysProps.getOrElseUpdate(k, v)
 }
 
-// Read from default spark properties, if any
-for ((k, v) - args.defaultSparkProperties) {
-  sysProps.getOrElseUpdate(k, v)
-}
-
 // Resolve paths in certain spark properties
 val pathConfigs = Seq(
   spark.jars,

http://git-wip-us.apache.org/repos/asf/spark/blob/2f545438/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 72a452e..f0e9ee6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -19,7 +19,6 @@ package org.apache.spark.deploy
 
 import java.util.jar.JarFile
 
-import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
 import org.apache.spark.util.Utils
@@ -72,39 +71,54 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, St
 defaultProperties
   }
 
-  // Respect SPARK_*_MEMORY for cluster mode
-  driverMemory = sys.env.get(SPARK_DRIVER_MEMORY).orNull
-  executorMemory = sys.env.get(SPARK_EXECUTOR_MEMORY).orNull
-
+  // Set parameters from command line arguments
   parseOpts(args.toList)
-  mergeSparkProperties()
+  // Populate `sparkProperties` map from properties file
+  mergeDefaultSparkProperties()
+  // Use `sparkProperties` map along with env vars to fill in any missing 
parameters
+  loadEnvironmentArguments()
+
   checkRequiredArguments()
 
   /**
-   * Fill in any undefined values based on the default properties file or 
options passed in through
-   * the '--conf' flag.
+   * Merge values from the default properties file with those specified 
through --conf.
+   * When this is called, `sparkProperties` is already filled with configs 
from the latter.
*/
-  private def mergeSparkProperties(): Unit = {
+  private def mergeDefaultSparkProperties(): Unit = {
 // Use common defaults file, if not specified by user
 propertiesFile = 
Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))
+// Honor --conf before the defaults file
+defaultSparkProperties.foreach { case (k, v) =
+  if (!sparkProperties.contains(k)) {
+sparkProperties(k) = v
+  }
+}
+  }
 
-val 

git commit: SPARK-1209 [CORE] SparkHadoop{MapRed, MapReduce}Util should not use package org.apache.hadoop

2014-10-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 2f5454381 - 68cb69daf


SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce}Util should not use package 
org.apache.hadoop

(This is just a look at what completely moving the classes would look like. I 
know Patrick flagged that as maybe not OK, although, it's private?)

Author: Sean Owen so...@cloudera.com

Closes #2814 from srowen/SPARK-1209 and squashes the following commits:

ead1115 [Sean Owen] Disable MIMA warnings resulting from moving the class -- 
this was also part of the PairRDDFunctions type hierarchy though?
2d42c1d [Sean Owen] Move SparkHadoopMapRedUtil / SparkHadoopMapReduceUtil from 
org.apache.hadoop to org.apache.spark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68cb69da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68cb69da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68cb69da

Branch: refs/heads/master
Commit: 68cb69daf3022e973422e496ccf827ca3806ff30
Parents: 2f54543
Author: Sean Owen so...@cloudera.com
Authored: Thu Oct 30 15:54:53 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Oct 30 15:54:53 2014 -0700

--
 .../hadoop/mapred/SparkHadoopMapRedUtil.scala   | 54 -
 .../mapreduce/SparkHadoopMapReduceUtil.scala| 79 ---
 .../org/apache/spark/SparkHadoopWriter.scala|  1 +
 .../spark/mapred/SparkHadoopMapRedUtil.scala| 56 ++
 .../mapreduce/SparkHadoopMapReduceUtil.scala| 80 
 .../org/apache/spark/rdd/NewHadoopRDD.scala |  1 +
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  3 +-
 project/MimaExcludes.scala  |  8 ++
 .../sql/parquet/ParquetTableOperations.scala|  1 +
 .../spark/sql/hive/hiveWriterContainers.scala   |  1 +
 10 files changed, 150 insertions(+), 134 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/68cb69da/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
--
diff --git 
a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
deleted file mode 100644
index 0c47afa..000
--- a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred
-
-private[apache]
-trait SparkHadoopMapRedUtil {
-  def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
-val klass = firstAvailableClass(org.apache.hadoop.mapred.JobContextImpl,
-  org.apache.hadoop.mapred.JobContext)
-val ctor = klass.getDeclaredConstructor(classOf[JobConf],
-  classOf[org.apache.hadoop.mapreduce.JobID])
-ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
-  }
-
-  def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): 
TaskAttemptContext = {
-val klass = 
firstAvailableClass(org.apache.hadoop.mapred.TaskAttemptContextImpl,
-  org.apache.hadoop.mapred.TaskAttemptContext)
-val ctor = klass.getDeclaredConstructor(classOf[JobConf], 
classOf[TaskAttemptID])
-ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
-  }
-
-  def newTaskAttemptID(
-  jtIdentifier: String,
-  jobId: Int,
-  isMap: Boolean,
-  taskId: Int,
-  attemptId: Int) = {
-new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
-  }
-
-  private def firstAvailableClass(first: String, second: String): Class[_] = {
-try {
-  Class.forName(first)
-} catch {
-  case e: ClassNotFoundException =
-Class.forName(second)
-}
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/68cb69da/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
--
diff --git 
a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
 

git commit: [SPARK-4120][SQL] Join of multiple tables with syntax like SELECT .. FROM T1, T2, T3.. does not work in SparkSQL

2014-10-30 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 68cb69daf - 9b6ebe33d


[SPARK-4120][SQL] Join of multiple tables with syntax like SELECT .. FROM 
T1,T2,T3.. does not work in SparkSQL

Right now it works for only 2 tables like below query.
sql(SELECT * FROM records1 as a,records2 as b where a.key=b.key )

But it does not work for more than 2 tables like below query
sql(SELECT * FROM records1 as a,records2 as b,records3 as c where a.key=b.key 
and a.key=c.key).

Author: ravipesala ravindra.pes...@huawei.com

Closes #2987 from ravipesala/multijoin and squashes the following commits:

429b005 [ravipesala] Support multiple joins


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b6ebe33
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b6ebe33
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b6ebe33

Branch: refs/heads/master
Commit: 9b6ebe33db27be38c3036ffeda17096043fb0fb9
Parents: 68cb69d
Author: ravipesala ravindra.pes...@huawei.com
Authored: Thu Oct 30 17:15:45 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Thu Oct 30 17:15:45 2014 -0700

--
 .../scala/org/apache/spark/sql/catalyst/SqlParser.scala   |  3 ++-
 .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala   | 10 ++
 2 files changed, 12 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b6ebe33/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 0acf725..942b843 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -166,7 +166,8 @@ class SqlParser extends AbstractSparkSQLParser {
   // Based very loosely on the MySQL Grammar.
   // http://dev.mysql.com/doc/refman/5.0/en/join.html
   protected lazy val relations: Parser[LogicalPlan] =
-( relation ~ (, ~ relation) ^^ { case r1 ~ r2 = Join(r1, r2, Inner, 
None) }
+( relation ~ rep1(, ~ relation) ^^ {
+case r1 ~ joins = joins.foldLeft(r1) { case(lhs, r) = Join(lhs, r, 
Inner, None) } }
 | relation
 )
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9b6ebe33/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 1034c2d..4c36ca0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -899,4 +899,14 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll {
   test(SPARK-3814 Support Bitwise ~ operator) {
 checkAnswer(sql(SELECT ~key FROM testData WHERE key = 1 ), -2)
   }
+
+  test(SPARK-4120 Join of multiple tables does not work in SparkSQL) {
+checkAnswer(
+  sql(
+SELECT a.key, b.key, c.key
+  |FROM testData a,testData b,testData c
+  |where a.key = b.key and a.key = c.key
+.stripMargin),
+  (1 to 100).map(i = Seq(i, i, i)))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-3968][SQL] Use parquet-mr filter2 api

2014-10-30 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 9b6ebe33d - 2e35e2429


[SPARK-3968][SQL] Use parquet-mr filter2 api

The parquet-mr project has introduced a new filter api  
(https://github.com/apache/incubator-parquet-mr/pull/4), along with several 
fixes . It can also eliminate entire RowGroups depending on certain statistics 
like min/max
We can leverage that to further improve performance of queries with filters.
Also filter2 api introduces ability to create custom filters. We can create a 
custom filter for the optimized In clause (InSet) , so that elimination happens 
in the ParquetRecordReader itself

Author: Yash Datta yash.da...@guavus.com

Closes #2841 from saucam/master and squashes the following commits:

8282ba0 [Yash Datta] SPARK-3968: fix scala code style and add some more tests 
for filtering on optional columns
515df1c [Yash Datta] SPARK-3968: Add a test case for filter pushdown on 
optional column
5f4530e [Yash Datta] SPARK-3968: Fix scala code style
f304667 [Yash Datta] SPARK-3968: Using task metadata strategy for row group 
filtering
ec53e92 [Yash Datta] SPARK-3968: No push down should result in case we are 
unable to create a record filter
48163c3 [Yash Datta] SPARK-3968: Code cleanup
cc7b596 [Yash Datta] SPARK-3968: 1. Fix RowGroupFiltering not working   
  2. Use the serialization/deserialization from Parquet library for filter 
pushdown
caed851 [Yash Datta] Revert SPARK-3968: Not pushing the filters in case of 
OPTIONAL columns since filtering on optional columns is now supported in 
filter2 api
49703c9 [Yash Datta] SPARK-3968: Not pushing the filters in case of OPTIONAL 
columns
9d09741 [Yash Datta] SPARK-3968: Change parquet filter pushdown to use filter2 
api of parquet-mr


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e35e242
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e35e242
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e35e242

Branch: refs/heads/master
Commit: 2e35e24294ad8a5e76c89ea888fe330052dabd5a
Parents: 9b6ebe3
Author: Yash Datta yash.da...@guavus.com
Authored: Thu Oct 30 17:17:24 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Thu Oct 30 17:17:31 2014 -0700

--
 pom.xml |   2 +-
 .../spark/sql/parquet/ParquetFilters.scala  | 230 +++
 .../sql/parquet/ParquetTableOperations.scala| 179 ---
 .../spark/sql/parquet/ParquetTestData.scala |  19 ++
 .../spark/sql/parquet/ParquetQuerySuite.scala   |  57 +
 5 files changed, 308 insertions(+), 179 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2e35e242/pom.xml
--
diff --git a/pom.xml b/pom.xml
index e4c9247..379274d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,7 +133,7 @@
 !-- Version used for internal directory structure --
 hive.version.short0.13.1/hive.version.short
 derby.version10.10.1.1/derby.version
-parquet.version1.4.3/parquet.version
+parquet.version1.6.0rc3/parquet.version
 jblas.version1.2.3/jblas.version
 jetty.version8.1.14.v20131031/jetty.version
 chill.version0.3.6/chill.version

http://git-wip-us.apache.org/repos/asf/spark/blob/2e35e242/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 7c83f1c..517a5cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -21,8 +21,12 @@ import java.nio.ByteBuffer
 
 import org.apache.hadoop.conf.Configuration
 
-import parquet.filter._
-import parquet.filter.ColumnPredicates._
+import parquet.filter2.compat.FilterCompat
+import parquet.filter2.compat.FilterCompat._
+import parquet.filter2.predicate.FilterPredicate
+import parquet.filter2.predicate.FilterApi
+import parquet.filter2.predicate.FilterApi._
+import parquet.io.api.Binary
 import parquet.column.ColumnReader
 
 import com.google.common.io.BaseEncoding
@@ -38,67 +42,74 @@ private[sql] object ParquetFilters {
   // set this to false if pushdown should be disabled
   val PARQUET_FILTER_PUSHDOWN_ENABLED = spark.sql.hints.parquetFilterPushdown
 
-  def createRecordFilter(filterExpressions: Seq[Expression]): 
UnboundRecordFilter = {
+  def createRecordFilter(filterExpressions: Seq[Expression]): Filter = {
 val filters: Seq[CatalystFilter] = filterExpressions.collect {
   case (expression: Expression) if createFilter(expression).isDefined =
 

git commit: Revert SPARK-1209 [CORE] SparkHadoop{MapRed, MapReduce}Util should not use package org.apache.hadoop

2014-10-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 2e35e2429 - 26d31d15f


Revert SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce}Util should not use 
package org.apache.hadoop

This reverts commit 68cb69daf3022e973422e496ccf827ca3806ff30.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26d31d15
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26d31d15
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26d31d15

Branch: refs/heads/master
Commit: 26d31d15fda3f63707a28d1a1115770ad127cf8f
Parents: 2e35e24
Author: Andrew Or and...@databricks.com
Authored: Thu Oct 30 17:56:10 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Oct 30 17:56:10 2014 -0700

--
 .../hadoop/mapred/SparkHadoopMapRedUtil.scala   | 54 +
 .../mapreduce/SparkHadoopMapReduceUtil.scala| 79 +++
 .../org/apache/spark/SparkHadoopWriter.scala|  1 -
 .../spark/mapred/SparkHadoopMapRedUtil.scala| 56 --
 .../mapreduce/SparkHadoopMapReduceUtil.scala| 80 
 .../org/apache/spark/rdd/NewHadoopRDD.scala |  1 -
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  3 +-
 project/MimaExcludes.scala  |  8 --
 .../sql/parquet/ParquetTableOperations.scala|  1 -
 .../spark/sql/hive/hiveWriterContainers.scala   |  1 -
 10 files changed, 134 insertions(+), 150 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
--
diff --git 
a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
new file mode 100644
index 000..0c47afa
--- /dev/null
+++ b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred
+
+private[apache]
+trait SparkHadoopMapRedUtil {
+  def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
+val klass = firstAvailableClass(org.apache.hadoop.mapred.JobContextImpl,
+  org.apache.hadoop.mapred.JobContext)
+val ctor = klass.getDeclaredConstructor(classOf[JobConf],
+  classOf[org.apache.hadoop.mapreduce.JobID])
+ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
+  }
+
+  def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): 
TaskAttemptContext = {
+val klass = 
firstAvailableClass(org.apache.hadoop.mapred.TaskAttemptContextImpl,
+  org.apache.hadoop.mapred.TaskAttemptContext)
+val ctor = klass.getDeclaredConstructor(classOf[JobConf], 
classOf[TaskAttemptID])
+ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
+  }
+
+  def newTaskAttemptID(
+  jtIdentifier: String,
+  jobId: Int,
+  isMap: Boolean,
+  taskId: Int,
+  attemptId: Int) = {
+new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
+  }
+
+  private def firstAvailableClass(first: String, second: String): Class[_] = {
+try {
+  Class.forName(first)
+} catch {
+  case e: ClassNotFoundException =
+Class.forName(second)
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
--
diff --git 
a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
 
b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
new file mode 100644
index 000..1fca572
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF 

git commit: HOTFIX: Clean up build in network module.

2014-10-30 Thread adav
Repository: spark
Updated Branches:
  refs/heads/master 26d31d15f - 0734d0932


HOTFIX: Clean up build in network module.

This is currently breaking the package build for some people (including me).

This patch does some general clean-up which also fixes the current issue.
- Uses consistent artifact naming
- Adds sbt support for this module
- Changes tests to use scalatest (fixes the original issue[1])

One thing to note, it turns out that scalatest when invoked in the
Maven build doesn't succesfully detect JUnit Java tests. This is
a long standing issue, I noticed it applies to all of our current
test suites as well. I've created SPARK-4159 to fix this.

[1] The original issue is that we need to allocate extra memory
for the tests, happens by default in our scalatest configuration.

Author: Patrick Wendell pwend...@gmail.com

Closes #3025 from pwendell/hotfix and squashes the following commits:

faa9053 [Patrick Wendell] HOTFIX: Clean up build in network module.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0734d093
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0734d093
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0734d093

Branch: refs/heads/master
Commit: 0734d09320fe37edd3a02718511cda0bda852478
Parents: 26d31d1
Author: Patrick Wendell pwend...@gmail.com
Authored: Thu Oct 30 20:15:36 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Thu Oct 30 20:15:36 2014 -0700

--
 core/pom.xml |  2 +-
 network/common/pom.xml   | 34 +-
 project/SparkBuild.scala |  8 +---
 3 files changed, 23 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0734d093/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 8020a2d..6963ce4 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -46,7 +46,7 @@
 /dependency
 dependency
   groupIdorg.apache.spark/groupId
-  artifactIdnetwork/artifactId
+  artifactIdspark-network-common_2.10/artifactId
   version${project.version}/version
 /dependency
 dependency

http://git-wip-us.apache.org/repos/asf/spark/blob/0734d093/network/common/pom.xml
--
diff --git a/network/common/pom.xml b/network/common/pom.xml
index e3b7e32..a33e44b 100644
--- a/network/common/pom.xml
+++ b/network/common/pom.xml
@@ -27,12 +27,12 @@
   /parent
 
   groupIdorg.apache.spark/groupId
-  artifactIdnetwork/artifactId
+  artifactIdspark-network-common_2.10/artifactId
   packagingjar/packaging
-  nameShuffle Streaming Service/name
+  nameSpark Project Common Network Code/name
   urlhttp://spark.apache.org//url
   properties
-sbt.project.namenetwork/sbt.project.name
+sbt.project.namenetwork-common/sbt.project.name
   /properties
 
   dependencies
@@ -60,6 +60,11 @@
   scopetest/scope
 /dependency
 dependency
+  groupIdcom.novocode/groupId
+  artifactIdjunit-interface/artifactId
+  scopetest/scope
+/dependency
+dependency
   groupIdlog4j/groupId
   artifactIdlog4j/artifactId
   scopetest/scope
@@ -69,25 +74,20 @@
   artifactIdmockito-all/artifactId
   scopetest/scope
 /dependency
+dependency
+  groupIdorg.scalatest/groupId
+  artifactIdscalatest_${scala.binary.version}/artifactId
+  scopetest/scope
+/dependency
   /dependencies
 
-
   build
-outputDirectorytarget/java/classes/outputDirectory
-testOutputDirectorytarget/java/test-classes/testOutputDirectory
+
outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory
+
testOutputDirectorytarget/scala-${scala.binary.version}/test-classes/testOutputDirectory
 plugins
   plugin
-groupIdorg.apache.maven.plugins/groupId
-artifactIdmaven-surefire-plugin/artifactId
-version2.17/version
-configuration
-  skipTestsfalse/skipTests
-  includes
-include**/Test*.java/include
-include**/*Test.java/include
-include**/*Suite.java/include
-  /includes
-/configuration
+groupIdorg.scalatest/groupId
+artifactIdscalatest-maven-plugin/artifactId
   /plugin
 /plugins
   /build

http://git-wip-us.apache.org/repos/asf/spark/blob/0734d093/project/SparkBuild.scala
--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 6d5eb68..7708351 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -31,10 +31,10 @@ object BuildCommons {
   private val buildLocation = file(.).getAbsoluteFile.getParentFile
 
   val allProjects@Seq(bagel, catalyst, core, graphx, hive, 

git commit: [SPARK-4124] [MLlib] [PySpark] simplify serialization in MLlib Python API

2014-10-30 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 0734d0932 - 872fc669b


[SPARK-4124] [MLlib] [PySpark] simplify serialization in MLlib Python API

Create several helper functions to call MLlib Java API, convert the arguments 
to Java type and convert return value to Python object automatically, this 
simplify serialization in MLlib Python API very much.

After this, the MLlib Python API does not need to deal with serialization 
details anymore, it's easier to add new API.

cc mengxr

Author: Davies Liu dav...@databricks.com

Closes #2995 from davies/cleanup and squashes the following commits:

8fa6ec6 [Davies Liu] address comments
16b85a0 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
cleanup
43743e5 [Davies Liu] bugfix
731331f [Davies Liu] simplify serialization in MLlib Python API


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/872fc669
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/872fc669
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/872fc669

Branch: refs/heads/master
Commit: 872fc669b497fb255db3212568f2a14c2ba0d5db
Parents: 0734d09
Author: Davies Liu dav...@databricks.com
Authored: Thu Oct 30 22:25:18 2014 -0700
Committer: Xiangrui Meng m...@databricks.com
Committed: Thu Oct 30 22:25:18 2014 -0700

--
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  84 ++--
 python/pyspark/mllib/classification.py  |  30 ++---
 python/pyspark/mllib/clustering.py  |  15 +--
 python/pyspark/mllib/common.py  | 135 +++
 python/pyspark/mllib/feature.py | 122 +++--
 python/pyspark/mllib/linalg.py  |  12 --
 python/pyspark/mllib/random.py  |  34 ++---
 python/pyspark/mllib/recommendation.py  |  62 ++---
 python/pyspark/mllib/regression.py  |  52 +++
 python/pyspark/mllib/stat.py|  65 ++---
 python/pyspark/mllib/tree.py|  55 ++--
 python/pyspark/mllib/util.py|   7 +-
 12 files changed, 287 insertions(+), 386 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/872fc669/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 485abe2..acdc67d 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.mllib.api.python
 
 import java.io.OutputStream
-import java.util.{ArrayList = JArrayList}
+import java.util.{ArrayList = JArrayList, List = JList, Map = JMap}
 
 import scala.collection.JavaConverters._
 import scala.language.existentials
@@ -72,15 +72,11 @@ class PythonMLLibAPI extends Serializable {
   private def trainRegressionModel(
   learner: GeneralizedLinearAlgorithm[_ : GeneralizedLinearModel],
   data: JavaRDD[LabeledPoint],
-  initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = 
{
-val initialWeights = SerDe.loads(initialWeightsBA).asInstanceOf[Vector]
+  initialWeights: Vector): JList[Object] = {
 // Disable the uncached input warning because 'data' is a deliberately 
uncached MappedRDD.
 learner.disableUncachedWarning()
 val model = learner.run(data.rdd, initialWeights)
-val ret = new java.util.LinkedList[java.lang.Object]()
-ret.add(SerDe.dumps(model.weights))
-ret.add(model.intercept: java.lang.Double)
-ret
+List(model.weights, model.intercept).map(_.asInstanceOf[Object]).asJava
   }
 
   /**
@@ -91,10 +87,10 @@ class PythonMLLibAPI extends Serializable {
   numIterations: Int,
   stepSize: Double,
   miniBatchFraction: Double,
-  initialWeightsBA: Array[Byte], 
+  initialWeights: Vector,
   regParam: Double,
   regType: String,
-  intercept: Boolean): java.util.List[java.lang.Object] = {
+  intercept: Boolean): JList[Object] = {
 val lrAlg = new LinearRegressionWithSGD()
 lrAlg.setIntercept(intercept)
 lrAlg.optimizer
@@ -113,7 +109,7 @@ class PythonMLLibAPI extends Serializable {
 trainRegressionModel(
   lrAlg,
   data,
-  initialWeightsBA)
+  initialWeights)
   }
 
   /**
@@ -125,7 +121,7 @@ class PythonMLLibAPI extends Serializable {
   stepSize: Double,
   regParam: Double,
   miniBatchFraction: Double,
-  initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+  initialWeights: Vector): JList[Object] = {
 val lassoAlg = new 

git commit: [SPARK-3250] Implement Gap Sampling optimization for random sampling

2014-10-30 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 872fc669b - ad3bd0dff


[SPARK-3250] Implement Gap Sampling optimization for random sampling

More efficient sampling, based on Gap Sampling optimization:
http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/

Author: Erik Erlandson eerla...@redhat.com

Closes #2455 from erikerlandson/spark-3250-pr and squashes the following 
commits:

72496bc [Erik Erlandson] [SPARK-3250] Implement Gap Sampling optimization for 
random sampling


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad3bd0df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad3bd0df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad3bd0df

Branch: refs/heads/master
Commit: ad3bd0dff8997861c5a04438145ba6f91c57a849
Parents: 872fc66
Author: Erik Erlandson eerla...@redhat.com
Authored: Thu Oct 30 22:30:52 2014 -0700
Committer: Xiangrui Meng m...@databricks.com
Committed: Thu Oct 30 22:30:52 2014 -0700

--
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   6 +-
 .../spark/util/random/RandomSampler.scala   | 286 -
 .../java/org/apache/spark/JavaAPISuite.java |   9 +-
 .../spark/util/random/RandomSamplerSuite.scala  | 606 ---
 .../org/apache/spark/mllib/util/MLUtils.scala   |   4 +-
 5 files changed, 790 insertions(+), 121 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/main/scala/org/apache/spark/rdd/RDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index b7f125d..c169b2d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -43,7 +43,8 @@ import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
 import org.apache.spark.util.collection.OpenHashMap
-import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, 
SamplingUtils}
+import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, 
BernoulliCellSampler,
+  SamplingUtils}
 
 /**
  * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 
Represents an immutable,
@@ -375,7 +376,8 @@ abstract class RDD[T: ClassTag](
 val sum = weights.sum
 val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
 normalizedCumWeights.sliding(2).map { x =
-  new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), 
x(1)), true, seed)
+  new PartitionwiseSampledRDD[T, T](
+this, new BernoulliCellSampler[T](x(0), x(1)), true, seed)
 }.toArray
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala 
b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
index ee389de..76e7a27 100644
--- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
@@ -19,6 +19,9 @@ package org.apache.spark.util.random
 
 import java.util.Random
 
+import scala.reflect.ClassTag
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.commons.math3.distribution.PoissonDistribution
 
 import org.apache.spark.annotation.DeveloperApi
@@ -38,13 +41,47 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
   /** take a random sample */
   def sample(items: Iterator[T]): Iterator[U]
 
+  /** return a copy of the RandomSampler object */
   override def clone: RandomSampler[T, U] =
 throw new NotImplementedError(clone() is not implemented.)
 }
 
+private[spark]
+object RandomSampler {
+  /** Default random number generator used by random samplers. */
+  def newDefaultRNG: Random = new XORShiftRandom
+
+  /**
+   * Default maximum gap-sampling fraction.
+   * For sampling fractions = this value, the gap sampling optimization will 
be applied.
+   * Above this value, it is assumed that tradtional Bernoulli sampling is 
faster.  The
+   * optimal value for this will depend on the RNG.  More expensive RNGs will 
tend to make
+   * the optimal value higher.  The most reliable way to determine this value 
for a new RNG
+   * is to experiment.  When tuning for a new RNG, I would expect a value of 
0.5 to be close
+   * in most cases, as an initial guess.
+   */
+  val defaultMaxGapSamplingFraction = 0.4
+
+  /**
+   * Default epsilon for floating point numbers sampled from the RNG.
+   * The