git commit: [SPARK-1720][SPARK-1719] use LD_LIBRARY_PATH instead of -Djava.library.path
Repository: spark Updated Branches: refs/heads/master 123425807 - cd739bd75 [SPARK-1720][SPARK-1719] use LD_LIBRARY_PATH instead of -Djava.library.path - [X] Standalone - [X] YARN - [X] Mesos - [X] Mac OS X - [X] Linux - [ ] Windows This is another implementation about #1031 Author: GuoQiang Li wi...@qq.com Closes #2711 from witgo/SPARK-1719 and squashes the following commits: c7b26f6 [GuoQiang Li] review commits 4488e41 [GuoQiang Li] Refactoring CommandUtils a444094 [GuoQiang Li] review commits 40c0b4a [GuoQiang Li] Add buildLocalCommand method c1a0ddd [GuoQiang Li] fix comments 156ce88 [GuoQiang Li] review commit 38aa377 [GuoQiang Li] Refactor CommandUtils.scala 4269e00 [GuoQiang Li] Refactor SparkSubmitDriverBootstrapper.scala 7a1d634 [GuoQiang Li] use LD_LIBRARY_PATH instead of -Djava.library.path Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd739bd7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd739bd7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd739bd7 Branch: refs/heads/master Commit: cd739bd756875bd52e9bd8ae801e0ae10a1f6937 Parents: 1234258 Author: GuoQiang Li wi...@qq.com Authored: Wed Oct 29 23:02:58 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Wed Oct 29 23:02:58 2014 -0700 -- bin/spark-class | 6 +- .../main/scala/org/apache/spark/SparkConf.scala | 13 .../deploy/SparkSubmitDriverBootstrapper.scala | 17 ++--- .../spark/deploy/worker/CommandUtils.scala | 68 .../spark/deploy/worker/DriverRunner.scala | 23 ++- .../spark/deploy/worker/ExecutorRunner.scala| 26 +++- .../mesos/CoarseMesosSchedulerBackend.scala | 22 --- .../cluster/mesos/MesosSchedulerBackend.scala | 18 +++--- .../scala/org/apache/spark/util/Utils.scala | 42 +++- .../apache/spark/deploy/CommandUtilsSuite.scala | 37 +++ .../deploy/worker/ExecutorRunnerTest.scala | 5 +- .../apache/spark/deploy/yarn/ClientBase.scala | 14 +++- .../deploy/yarn/ExecutorRunnableUtil.scala | 11 +++- 13 files changed, 221 insertions(+), 81 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cd739bd7/bin/spark-class -- diff --git a/bin/spark-class b/bin/spark-class index 91d858b..925367b 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -81,7 +81,11 @@ case $1 in OUR_JAVA_OPTS=$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM} if [ -n $SPARK_SUBMIT_LIBRARY_PATH ]; then - OUR_JAVA_OPTS=$OUR_JAVA_OPTS -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH + if [[ $OSTYPE == darwin* ]]; then + export DYLD_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:$DYLD_LIBRARY_PATH + else + export LD_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:$LD_LIBRARY_PATH + fi fi if [ -n $SPARK_SUBMIT_DRIVER_MEMORY ]; then OUR_JAVA_MEM=$SPARK_SUBMIT_DRIVER_MEMORY http://git-wip-us.apache.org/repos/asf/spark/blob/cd739bd7/core/src/main/scala/org/apache/spark/SparkConf.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index dbbcc23..ad0a901 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -244,6 +244,19 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { val executorClasspathKey = spark.executor.extraClassPath val driverOptsKey = spark.driver.extraJavaOptions val driverClassPathKey = spark.driver.extraClassPath +val driverLibraryPathKey = spark.driver.extraLibraryPath + +// Used by Yarn in 1.1 and before +sys.props.get(spark.driver.libraryPath).foreach { value = + val warning = +s + |spark.driver.libraryPath was detected (set to '$value'). + |This is deprecated in Spark 1.2+. + | + |Please instead use: $driverLibraryPathKey +.stripMargin + logWarning(warning) +} // Validate spark.executor.extraJavaOptions settings.get(executorOptsKey).map { javaOpts = http://git-wip-us.apache.org/repos/asf/spark/blob/cd739bd7/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 0125330..2b894a7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++
git commit: [SPARK-4102] Remove unused ShuffleReader.stop() method.
Repository: spark Updated Branches: refs/heads/master cd739bd75 - 6db315746 [SPARK-4102] Remove unused ShuffleReader.stop() method. This method is not implemented by the only subclass (HashShuffleReader), nor is it ever called. While the use of Scala's fancy ??? was pretty exciting, the method's existence can only lead to confusion and it therefore should be deleted. mateiz was there a reason for adding this that I'm missing? Author: Kay Ousterhout kayousterh...@gmail.com Closes #2966 from kayousterhout/SPARK-4102 and squashes the following commits: 532c564 [Kay Ousterhout] Added back commented-out method, as per Matei's request 904655e [Kay Ousterhout] [SPARK-4102] Remove unused ShuffleReader.stop() method. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6db31574 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6db31574 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6db31574 Branch: refs/heads/master Commit: 6db3157464e36f7a572ada5f1e7f88730aa23dbd Parents: cd739bd Author: Kay Ousterhout kayousterh...@gmail.com Authored: Wed Oct 29 23:52:46 2014 -0700 Committer: Kay Ousterhout kayousterh...@gmail.com Committed: Wed Oct 29 23:52:46 2014 -0700 -- .../main/scala/org/apache/spark/shuffle/ShuffleReader.scala | 8 ++-- .../org/apache/spark/shuffle/hash/HashShuffleReader.scala| 3 --- 2 files changed, 6 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6db31574/core/src/main/scala/org/apache/spark/shuffle/ShuffleReader.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleReader.scala index b30e366..292e483 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleReader.scala @@ -24,6 +24,10 @@ private[spark] trait ShuffleReader[K, C] { /** Read the combined key-values for this reduce task */ def read(): Iterator[Product2[K, C]] - /** Close this reader */ - def stop(): Unit + /** + * Close this reader. + * TODO: Add this back when we make the ShuffleReader a developer API that others can implement + * (at which point this will likely be necessary). + */ + // def stop(): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/6db31574/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 88a5f1e..5baf45d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -66,7 +66,4 @@ private[spark] class HashShuffleReader[K, C]( aggregatedIter } } - - /** Close this reader */ - override def stop(): Unit = ??? } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4130][MLlib] Fixing libSVM parser bug with extra whitespace
Repository: spark Updated Branches: refs/heads/master 6db315746 - c7ad08520 [SPARK-4130][MLlib] Fixing libSVM parser bug with extra whitespace This simple patch filters out extra whitespace entries. Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com Author: Joey joseph.e.gonza...@gmail.com Closes #2996 from jegonzal/loadLibSVM and squashes the following commits: e0227ab [Joey] improving readability e028e84 [Joseph E. Gonzalez] fixing whitespace bug in loadLibSVMFile when parsing libSVM files Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7ad0852 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7ad0852 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7ad0852 Branch: refs/heads/master Commit: c7ad0852084dc28f3ebc144adfd4928b23f1c8ea Parents: 6db3157 Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com Authored: Thu Oct 30 00:05:57 2014 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Thu Oct 30 00:05:57 2014 -0700 -- mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c7ad0852/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index dce0adf..b88e08b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -76,7 +76,7 @@ object MLUtils { .map { line = val items = line.split(' ') val label = items.head.toDouble -val (indices, values) = items.tail.map { item = +val (indices, values) = items.tail.filter(_.nonEmpty).map { item = val indexAndValue = item.split(':') val index = indexAndValue(0).toInt - 1 // Convert 1-based indices to 0-based. val value = indexAndValue(1).toDouble - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: SPARK-4111 [MLlib] add regression metrics
Repository: spark Updated Branches: refs/heads/master c7ad08520 - d9327192e SPARK-4111 [MLlib] add regression metrics Add RegressionMetrics.scala as regression metrics used for evaluation and corresponding test case RegressionMetricsSuite.scala. Author: Yanbo Liang yanboha...@gmail.com Author: liangyanbo liangya...@meituan.com Closes #2978 from yanbohappy/regression_metrics and squashes the following commits: 730d0a9 [Yanbo Liang] more clearly annotation 3d0bec1 [Yanbo Liang] rename and keep code style a8ad3e3 [Yanbo Liang] simplify code for keeping style d454909 [Yanbo Liang] rename parameter and function names, delete unused columns, add reference 2e56282 [liangyanbo] rename r2_score() and remove unused column 43bb12b [liangyanbo] add regression metrics Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9327192 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9327192 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9327192 Branch: refs/heads/master Commit: d9327192eee7f18e92381c59a42b0e1770f1f8f4 Parents: c7ad085 Author: Yanbo Liang yanboha...@gmail.com Authored: Thu Oct 30 12:00:56 2014 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Thu Oct 30 12:00:56 2014 -0700 -- .../mllib/evaluation/RegressionMetrics.scala| 89 .../evaluation/RegressionMetricsSuite.scala | 52 2 files changed, 141 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d9327192/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala new file mode 100644 index 000..693117d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, MultivariateOnlineSummarizer} + +/** + * :: Experimental :: + * Evaluator for regression. + * + * @param predictionAndObservations an RDD of (prediction, observation) pairs. + */ +@Experimental +class RegressionMetrics(predictionAndObservations: RDD[(Double, Double)]) extends Logging { + + /** + * Use MultivariateOnlineSummarizer to calculate summary statistics of observations and errors. + */ + private lazy val summary: MultivariateStatisticalSummary = { +val summary: MultivariateStatisticalSummary = predictionAndObservations.map { + case (prediction, observation) = Vectors.dense(observation, observation - prediction) +}.aggregate(new MultivariateOnlineSummarizer())( +(summary, v) = summary.add(v), +(sum1, sum2) = sum1.merge(sum2) + ) +summary + } + + /** + * Returns the explained variance regression score. + * explainedVariance = 1 - variance(y - \hat{y}) / variance(y) + * Reference: [[http://en.wikipedia.org/wiki/Explained_variation]] + */ + def explainedVariance: Double = { +1 - summary.variance(1) / summary.variance(0) + } + + /** + * Returns the mean absolute error, which is a risk function corresponding to the + * expected value of the absolute error loss or l1-norm loss. + */ + def meanAbsoluteError: Double = { +summary.normL1(1) / summary.count + } + + /** + * Returns the mean squared error, which is a risk function corresponding to the + * expected value of the squared error loss or quadratic loss. + */ + def meanSquaredError: Double = { +val rmse = summary.normL2(1) / math.sqrt(summary.count) +rmse * rmse + } + + /** + * Returns the root mean squared error, which is defined as the square root of + *
git commit: [SPARK-4028][Streaming] ReceivedBlockHandler interface to abstract the functionality of storage of received data
Repository: spark Updated Branches: refs/heads/master d9327192e - 234de9232 [SPARK-4028][Streaming] ReceivedBlockHandler interface to abstract the functionality of storage of received data As part of the initiative to prevent data loss on streaming driver failure, this JIRA tracks the subtask of implementing a ReceivedBlockHandler, that abstracts the functionality of storage of received data blocks. The default implementation will maintain the current behavior of storing the data into BlockManager. The optional implementation will store the data to both BlockManager as well as a write ahead log. Author: Tathagata Das tathagata.das1...@gmail.com Closes #2940 from tdas/driver-ha-rbh and squashes the following commits: 78a4aaa [Tathagata Das] Fixed bug causing test failures. f192f47 [Tathagata Das] Fixed import order. df5f320 [Tathagata Das] Updated code to use ReceivedBlockStoreResult as the return type for handler's storeBlock 33c30c9 [Tathagata Das] Added license, and organized imports. 2f025b3 [Tathagata Das] Updates based on PR comments. 18aec1e [Tathagata Das] Moved ReceivedBlockInfo back into spark.streaming.scheduler package 95a4987 [Tathagata Das] Added ReceivedBlockHandler and its associated tests Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/234de923 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/234de923 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/234de923 Branch: refs/heads/master Commit: 234de9232bcfa212317a8073c4a82c3863b36b14 Parents: d932719 Author: Tathagata Das tathagata.das1...@gmail.com Authored: Thu Oct 30 14:51:13 2014 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Oct 30 14:51:13 2014 -0700 -- .../dstream/ReceiverInputDStream.scala | 7 +- .../streaming/receiver/ReceivedBlock.scala | 35 +++ .../receiver/ReceivedBlockHandler.scala | 193 ++ .../receiver/ReceiverSupervisorImpl.scala | 88 --- .../spark/streaming/scheduler/BatchInfo.scala | 2 +- .../spark/streaming/scheduler/JobSet.scala | 3 +- .../streaming/scheduler/ReceivedBlockInfo.scala | 28 ++ .../streaming/scheduler/ReceiverTracker.scala | 24 +- .../util/WriteAheadLogRandomReader.scala| 1 - .../streaming/ReceivedBlockHandlerSuite.scala | 258 +++ .../streaming/util/WriteAheadLogSuite.scala | 34 ++- 11 files changed, 603 insertions(+), 70 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 391e409..bb47d37 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -23,8 +23,9 @@ import scala.reflect.ClassTag import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.BlockId import org.apache.spark.streaming._ -import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, BlockManagerBasedStoreResult, Receiver} import org.apache.spark.streaming.scheduler.ReceivedBlockInfo +import org.apache.spark.SparkException /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -65,10 +66,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont if (validTime = graph.startTime) { val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id) receivedBlockInfo(validTime) = blockInfo - val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) + val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] } Some(new BlockRDD[T](ssc.sc, blockIds)) } else { - Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) + Some(new BlockRDD[T](ssc.sc, Array.empty)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/234de923/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala new file mode 100644 index 000..47968af --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala @@ -0,0 +1,35 @@ +/* + * Licensed
git commit: [SPARK-4027][Streaming] WriteAheadLogBackedBlockRDD to read received either from BlockManager or WAL in HDFS
Repository: spark Updated Branches: refs/heads/master 234de9232 - fb1fbca20 [SPARK-4027][Streaming] WriteAheadLogBackedBlockRDD to read received either from BlockManager or WAL in HDFS As part of the initiative of preventing data loss on streaming driver failure, this sub-task implements a BlockRDD that is backed by HDFS. This BlockRDD can either read data from the Spark's BlockManager, or read the data from file-segments in write ahead log in HDFS. Most of this code has been written by @harishreedharan Author: Tathagata Das tathagata.das1...@gmail.com Author: Hari Shreedharan hshreedha...@apache.org Closes #2931 from tdas/driver-ha-rdd and squashes the following commits: 209e49c [Tathagata Das] Better fix to style issue. 4a5866f [Tathagata Das] Addressed one more comment. ed5fbf0 [Tathagata Das] Minor updates. b0a18b1 [Tathagata Das] Fixed import order. 20aa7c6 [Tathagata Das] Fixed more line length issues. 29aa099 [Tathagata Das] Fixed line length issues. 9e47b5b [Tathagata Das] Renamed class, simplified+added unit tests. 6e1bfb8 [Tathagata Das] Tweaks testuite to create spark contxt lazily to prevent contxt leaks. 9c86a61 [Tathagata Das] Merge pull request #22 from harishreedharan/driver-ha-rdd 2878c38 [Hari Shreedharan] Shutdown spark context after tests. Formatting/minor fixes c709f2f [Tathagata Das] Merge pull request #21 from harishreedharan/driver-ha-rdd 5cce16f [Hari Shreedharan] Make sure getBlockLocations uses offset and length to find the blocks on HDFS eadde56 [Tathagata Das] Transferred HDFSBackedBlockRDD for the driver-ha-working branch Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb1fbca2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb1fbca2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb1fbca2 Branch: refs/heads/master Commit: fb1fbca204250840ffdbc0fcbf80b8dfeebf9edb Parents: 234de92 Author: Tathagata Das tathagata.das1...@gmail.com Authored: Thu Oct 30 15:17:02 2014 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Oct 30 15:17:02 2014 -0700 -- .../scala/org/apache/spark/rdd/BlockRDD.scala | 4 + .../rdd/WriteAheadLogBackedBlockRDD.scala | 125 +++ .../apache/spark/streaming/util/HdfsUtils.scala | 8 +- .../rdd/WriteAheadLogBackedBlockRDDSuite.scala | 151 +++ 4 files changed, 285 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb1fbca2/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 2673ec2..fffa191 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -84,5 +84,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds Attempted to use %s after its blocks have been removed!.format(toString)) } } + + protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = { +locations_ + } } http://git-wip-us.apache.org/repos/asf/spark/blob/fb1fbca2/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala new file mode 100644 index 000..23295bf --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark._ +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.{BlockId,
git commit: [SPARK-4078] New FsPermission instance w/o FsPermission.createImmutable in eventlog
Repository: spark Updated Branches: refs/heads/master fb1fbca20 - 9142c9b80 [SPARK-4078] New FsPermission instance w/o FsPermission.createImmutable in eventlog By default, Spark builds its package against Hadoop 1.0.4 version. In that version, it has some FsPermission bug (see [HADOOP-7629] (https://issues.apache.org/jira/browse/HADOOP-7629) by Todd Lipcon). This bug got fixed since 1.1 version. By using that FsPermission.createImmutable() API, end-user may see some RPC exception like below (if turn on eventlog over HDFS). Here proposes a quick fix to avoid certain exception for all hadoop versions. ``` Exception in thread main java.io.IOException: Call to sr484/10.1.2.84:54310 failed on local exception: java.io.EOFException at org.apache.hadoop.ipc.Client.wrapException(Client.java:1150) at org.apache.hadoop.ipc.Client.call(Client.java:1118) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229) at $Proxy6.setPermission(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62) at $Proxy6.setPermission(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.setPermission(DFSClient.java:1285) at org.apache.hadoop.hdfs.DistributedFileSystem.setPermission(DistributedFileSystem.java:572) at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:138) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:324) ``` Author: Grace jie.hu...@intel.com Closes #2892 from GraceH/eventlog-rpc and squashes the following commits: 58ea038 [Grace] new FsPermission Instance w/o FsPermission.createImmutable Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9142c9b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9142c9b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9142c9b8 Branch: refs/heads/master Commit: 9142c9b80bfe12e0be8a2b795bf52e403b2c5f30 Parents: fb1fbca Author: Grace jie.hu...@intel.com Authored: Thu Oct 30 15:27:32 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Oct 30 15:27:32 2014 -0700 -- .../scala/org/apache/spark/scheduler/EventLoggingListener.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9142c9b8/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 100c9ba..597dbc8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -142,7 +142,7 @@ private[spark] object EventLoggingListener extends Logging { val SPARK_VERSION_PREFIX = SPARK_VERSION_ val COMPRESSION_CODEC_PREFIX = COMPRESSION_CODEC_ val APPLICATION_COMPLETE = APPLICATION_COMPLETE - val LOG_FILE_PERMISSIONS = FsPermission.createImmutable(Integer.parseInt(770, 8).toShort) + val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt(770, 8).toShort) // A cache for compression codecs to avoid creating the same codec many times private val codecMap = new mutable.HashMap[String, CompressionCodec] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-3319] [SPARK-3338] Resolve Spark submit config paths
Repository: spark Updated Branches: refs/heads/master 9142c9b80 - 24c512925 [SPARK-3319] [SPARK-3338] Resolve Spark submit config paths The bulk of this PR is comprised of tests. All changes in functionality are made in `SparkSubmit.scala` (~20 lines). **SPARK-3319.** There is currently a divergence in behavior when the user passes in additional jars through `--jars` and through setting `spark.jars` in the default properties file. The former will happily resolve the paths (e.g. convert `my.jar` to `file:/absolute/path/to/my.jar`), while the latter does not. We should resolve paths consistently in both cases. This also applies to the following pairs of command line arguments and Spark configs: - `--jars` ~ `spark.jars` - `--files` ~ `spark.files` / `spark.yarn.dist.files` - `--archives` ~ `spark.yarn.dist.archives` - `--py-files` ~ `spark.submit.pyFiles` **SPARK-3338.** This PR also fixes the following bug: if the user sets `spark.submit.pyFiles` in his/her properties file, it does not actually get picked up even if `--py-files` is not set. This is simply because the config is overridden by an empty string. Author: Andrew Or andrewo...@gmail.com Author: Andrew Or and...@databricks.com Closes #2232 from andrewor14/resolve-config-paths and squashes the following commits: fff2869 [Andrew Or] Add spark.yarn.jar da3a1c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into resolve-config-paths f0fae64 [Andrew Or] Merge branch 'master' of github.com:apache/spark into resolve-config-paths 05e03d6 [Andrew Or] Add tests for resolving both command line and config paths 460117e [Andrew Or] Resolve config paths properly fe039d3 [Andrew Or] Beef up tests to test fixed-pointed-ness of Utils.resolveURI(s) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24c51292 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24c51292 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24c51292 Branch: refs/heads/master Commit: 24c5129257ce6e3b734f168e860b714c2730b55f Parents: 9142c9b Author: Andrew Or andrewo...@gmail.com Authored: Thu Oct 30 15:29:07 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Oct 30 15:29:07 2014 -0700 -- .../org/apache/spark/deploy/SparkSubmit.scala | 28 - .../apache/spark/deploy/SparkSubmitSuite.scala | 106 ++- .../org/apache/spark/util/UtilsSuite.scala | 38 +-- 3 files changed, 158 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/24c51292/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index f97bf67..0379ade 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -158,8 +158,9 @@ object SparkSubmit { args.files = mergeFileLists(args.files, args.primaryResource) } args.files = mergeFileLists(args.files, args.pyFiles) - // Format python file paths properly before adding them to the PYTHONPATH - sysProps(spark.submit.pyFiles) = PythonRunner.formatPaths(args.pyFiles).mkString(,) + if (args.pyFiles != null) { +sysProps(spark.submit.pyFiles) = args.pyFiles + } } // Special flag to avoid deprecation warnings at the client @@ -284,6 +285,29 @@ object SparkSubmit { sysProps.getOrElseUpdate(k, v) } +// Resolve paths in certain spark properties +val pathConfigs = Seq( + spark.jars, + spark.files, + spark.yarn.jar, + spark.yarn.dist.files, + spark.yarn.dist.archives) +pathConfigs.foreach { config = + // Replace old URIs with resolved URIs, if they exist + sysProps.get(config).foreach { oldValue = +sysProps(config) = Utils.resolveURIs(oldValue) + } +} + +// Resolve and format python file paths properly before adding them to the PYTHONPATH. +// The resolving part is redundant in the case of --py-files, but necessary if the user +// explicitly sets `spark.submit.pyFiles` in his/her default properties file. +sysProps.get(spark.submit.pyFiles).foreach { pyFiles = + val resolvedPyFiles = Utils.resolveURIs(pyFiles) + val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(,) + sysProps(spark.submit.pyFiles) = formattedPyFiles +} + (childArgs, childClasspath, sysProps, childMainClass) } http://git-wip-us.apache.org/repos/asf/spark/blob/24c51292/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
git commit: [SPARK-4138][SPARK-4139] Improve dynamic allocation settings
Repository: spark Updated Branches: refs/heads/master 24c512925 - 26f092d4e [SPARK-4138][SPARK-4139] Improve dynamic allocation settings This should be merged after #2746 (SPARK-3795). **SPARK-4138**. If the user sets both the number of executors and `spark.dynamicAllocation.enabled`, we should throw an exception. **SPARK-4139**. If the user sets `spark.dynamicAllocation.enabled`, we should use the max number of executors as the starting number of executors because the first job is likely to run immediately after application startup. If the latter is not set, throw an exception. Author: Andrew Or and...@databricks.com Closes #3002 from andrewor14/yarn-set-executors and squashes the following commits: c528fce [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-set-executors 55d4699 [Andrew Or] Bug fix: `isDynamicAllocationEnabled` was always false 2b0ccec [Andrew Or] Start the number of executors at the max 022bfde [Andrew Or] Guard against incompatible settings of number of executors Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26f092d4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26f092d4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26f092d4 Branch: refs/heads/master Commit: 26f092d4e32cc1f7e279646075eaf1e495395923 Parents: 24c5129 Author: Andrew Or and...@databricks.com Authored: Thu Oct 30 15:31:23 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Oct 30 15:31:23 2014 -0700 -- .../yarn/ApplicationMasterArguments.scala | 3 +- .../spark/deploy/yarn/ClientArguments.scala | 30 +++- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 ++ .../cluster/YarnClusterSchedulerBackend.scala | 4 +-- 4 files changed, 29 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala -- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 5c54e34..104db4f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.yarn import org.apache.spark.util.{MemoryParam, IntParam} +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import collection.mutable.ArrayBuffer class ApplicationMasterArguments(val args: Array[String]) { @@ -26,7 +27,7 @@ class ApplicationMasterArguments(val args: Array[String]) { var userArgs: Seq[String] = Seq[String]() var executorMemory = 1024 var executorCores = 1 - var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS + var numExecutors = DEFAULT_NUMBER_EXECUTORS parseArgs(args.toList) http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala -- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index a12f82d..4d85945 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -20,8 +20,8 @@ package org.apache.spark.deploy.yarn import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf -import org.apache.spark.util.{Utils, IntParam, MemoryParam} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ +import org.apache.spark.util.{Utils, IntParam, MemoryParam} // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) { @@ -33,23 +33,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var userArgs: Seq[String] = Seq[String]() var executorMemory = 1024 // MB var executorCores = 1 - var numExecutors = 2 + var numExecutors = DEFAULT_NUMBER_EXECUTORS var amQueue = sparkConf.get(spark.yarn.queue, default) var amMemory: Int = 512 // MB var appName: String = Spark var priority = 0 - parseArgs(args.toList) - loadEnvironmentArgs() - // Additional memory to allocate to containers // For now, use driver's memory overhead as our AM container's memory overhead - val amMemoryOverhead = sparkConf.getInt(spark.yarn.driver.memoryOverhead,
git commit: [Minor] A few typos in comments and log messages
Repository: spark Updated Branches: refs/heads/master 26f092d4e - 5231a3f22 [Minor] A few typos in comments and log messages Author: Andrew Or andrewo...@gmail.com Author: Andrew Or and...@databricks.com Closes #3021 from andrewor14/typos and squashes the following commits: daaf417 [Andrew Or] Merge branch 'master' of github.com:apache/spark into typos 4838ae4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into typos 026d426 [Andrew Or] Merge branch 'master' of github.com:andrewor14/spark into typos a81ae8f [Andrew Or] Some typos Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5231a3f2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5231a3f2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5231a3f2 Branch: refs/heads/master Commit: 5231a3f228b5482cba09ae23a9f68498eba03c88 Parents: 26f092d Author: Andrew Or andrewo...@gmail.com Authored: Thu Oct 30 15:32:11 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Oct 30 15:32:11 2014 -0700 -- .../main/scala/org/apache/spark/scheduler/TaskResultGetter.scala | 2 +- .../main/scala/org/apache/spark/scheduler/TaskScheduler.scala| 4 ++-- .../apache/spark/deploy/yarn/ApplicationMasterArguments.scala| 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5231a3f2/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 3f345ce..4b5be68 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -93,7 +93,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul } } catch { case cnd: ClassNotFoundException = -// Log an error but keep going here -- the task failed, so not catastropic if we can't +// Log an error but keep going here -- the task failed, so not catastrophic if we can't // deserialize the reason. val loader = Utils.getContextOrSparkClassLoader logError( http://git-wip-us.apache.org/repos/asf/spark/blob/5231a3f2/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index a129a43..f095915 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -23,7 +23,7 @@ import org.apache.spark.storage.BlockManagerId /** * Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl. - * This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks + * This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks * for a single SparkContext. These schedulers get sets of tasks submitted to them from the * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running * them, retrying if there are failures, and mitigating stragglers. They return events to the @@ -41,7 +41,7 @@ private[spark] trait TaskScheduler { // Invoked after system has successfully initialized (typically in spark context). // Yarn uses this to bootstrap allocation of resources based on preferred locations, - // wait for slave registerations, etc. + // wait for slave registrations, etc. def postStartHook() { } // Disconnect from the cluster. http://git-wip-us.apache.org/repos/asf/spark/blob/5231a3f2/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala -- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 104db4f..8b32c76 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -82,7 +82,7 @@ class ApplicationMasterArguments(val args: Array[String]) { | --jar JAR_PATH Path to your application's JAR file | --class CLASS_NAME Name of your application's main class | --args ARGS Arguments to
git commit: [SPARK-4155] Consolidate usages of driver
Repository: spark Updated Branches: refs/heads/master 5231a3f22 - 9334d6996 [SPARK-4155] Consolidate usages of driver We use \driver\ everywhere. Let's not do that. Author: Andrew Or and...@databricks.com Closes #3020 from andrewor14/consolidate-driver and squashes the following commits: c1c2204 [Andrew Or] Just use driver for local executor ID 3d751e9 [Andrew Or] Consolidate usages of driver Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9334d699 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9334d699 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9334d699 Branch: refs/heads/master Commit: 9334d699671edd8f18370255017ad40c1d0340ee Parents: 5231a3f Author: Andrew Or and...@databricks.com Authored: Thu Oct 30 15:32:46 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Oct 30 15:32:46 2014 -0700 -- .../org/apache/spark/ExecutorAllocationManager.scala | 2 +- .../src/main/scala/org/apache/spark/SparkContext.scala | 2 ++ core/src/main/scala/org/apache/spark/SparkEnv.scala| 2 +- .../apache/spark/scheduler/local/LocalBackend.scala| 4 ++-- .../org/apache/spark/storage/BlockManagerId.scala | 3 ++- .../apache/spark/storage/StorageStatusListener.scala | 13 ++--- .../scala/org/apache/spark/ui/exec/ExecutorsTab.scala | 6 ++ .../spark/storage/BlockManagerReplicationSuite.scala | 8 +--- .../org/apache/spark/storage/BlockManagerSuite.scala | 10 ++ 9 files changed, 23 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b2cf022..c11f1db 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -419,7 +419,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { val executorId = blockManagerAdded.blockManagerId.executorId - if (executorId != driver) { + if (executorId != SparkContext.DRIVER_IDENTIFIER) { allocationManager.onExecutorAdded(executorId) } } http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 73668e8..6bfcd8c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1333,6 +1333,8 @@ object SparkContext extends Logging { private[spark] val SPARK_UNKNOWN_USER = unknown + private[spark] val DRIVER_IDENTIFIER = driver + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double) = 0.0 http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/SparkEnv.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 6a6dfda..557d2f5 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -156,7 +156,7 @@ object SparkEnv extends Logging { assert(conf.contains(spark.driver.port), spark.driver.port is not set on the driver!) val hostname = conf.get(spark.driver.host) val port = conf.get(spark.driver.port).toInt -create(conf, driver, hostname, port, true, isLocal, listenerBus) +create(conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, true, isLocal, listenerBus) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/9334d699/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 58b78f0..c026483 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import
git commit: Minor style hot fix after #2711
Repository: spark Updated Branches: refs/heads/master 9334d6996 - 849b43ec0 Minor style hot fix after #2711 I had planned to fix this when I merged it but I forgot to. witgo Author: Andrew Or and...@databricks.com Closes #3018 from andrewor14/command-utils-style and squashes the following commits: c2959fb [Andrew Or] Style hot fix Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/849b43ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/849b43ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/849b43ec Branch: refs/heads/master Commit: 849b43ec0f9e4f2ef962a054eb78cd0fc94a142a Parents: 9334d69 Author: Andrew Or and...@databricks.com Authored: Thu Oct 30 15:33:34 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Oct 30 15:33:34 2014 -0700 -- .../org/apache/spark/deploy/worker/CommandUtils.scala | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/849b43ec/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index aba2e20..28e9662 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -37,12 +37,12 @@ object CommandUtils extends Logging { * The `env` argument is exposed for testing. */ def buildProcessBuilder( -command: Command, -memory: Int, -sparkHome: String, -substituteArguments: String = String, -classPaths: Seq[String] = Seq[String](), -env: Map[String, String] = sys.env): ProcessBuilder = { + command: Command, + memory: Int, + sparkHome: String, + substituteArguments: String = String, + classPaths: Seq[String] = Seq[String](), + env: Map[String, String] = sys.env): ProcessBuilder = { val localCommand = buildLocalCommand(command, substituteArguments, classPaths, env) val commandSeq = buildCommandSeq(localCommand, memory, sparkHome) val builder = new ProcessBuilder(commandSeq: _*) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4153][WebUI] Update the sort keys for HistoryPage
Repository: spark Updated Branches: refs/heads/master 849b43ec0 - d34505783 [SPARK-4153][WebUI] Update the sort keys for HistoryPage Sort Started, Completed, Duration and Last Updated by time. Author: zsxwing zsxw...@gmail.com Closes #3014 from zsxwing/SPARK-4153 and squashes the following commits: ec8b9ad [zsxwing] Sort Started, Completed, Duration and Last Updated by time Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3450578 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3450578 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3450578 Branch: refs/heads/master Commit: d3450578357d6f7598243ee2ab11c338085ad9c1 Parents: 849b43e Author: zsxwing zsxw...@gmail.com Authored: Thu Oct 30 15:33:56 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Oct 30 15:33:56 2014 -0700 -- .../scala/org/apache/spark/deploy/history/HistoryPage.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d3450578/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index d25c291..0e249e5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -84,11 +84,11 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage() { tr tda href={uiAddress}{info.id}/a/td td{info.name}/td - td{startTime}/td - td{endTime}/td - td{duration}/td + td sorttable_customkey={info.startTime.toString}{startTime}/td + td sorttable_customkey={info.endTime.toString}{endTime}/td + td sorttable_customkey={(info.endTime - info.startTime).toString}{duration}/td td{info.sparkUser}/td - td{lastUpdated}/td + td sorttable_customkey={info.lastUpdated.toString}{lastUpdated}/td /tr } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-3661] Respect spark.*.memory in cluster mode
Repository: spark Updated Branches: refs/heads/master d34505783 - 2f5454381 [SPARK-3661] Respect spark.*.memory in cluster mode This also includes minor re-organization of the code. Tested locally in both client and deploy modes. Author: Andrew Or and...@databricks.com Author: Andrew Or andrewo...@gmail.com Closes #2697 from andrewor14/memory-cluster-mode and squashes the following commits: 01d78bc [Andrew Or] Merge branch 'master' of github.com:apache/spark into memory-cluster-mode ccd468b [Andrew Or] Add some comments per Patrick c956577 [Andrew Or] Tweak wording 2b4afa0 [Andrew Or] Unused import 47a5a88 [Andrew Or] Correct Spark properties precedence order bf64717 [Andrew Or] Merge branch 'master' of github.com:apache/spark into memory-cluster-mode dd452d0 [Andrew Or] Respect spark.*.memory in cluster mode Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f545438 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f545438 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f545438 Branch: refs/heads/master Commit: 2f54543815c0905dc958d444ad638c23a29507c6 Parents: d345057 Author: Andrew Or and...@databricks.com Authored: Thu Oct 30 15:44:29 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Oct 30 15:44:29 2014 -0700 -- .../org/apache/spark/deploy/SparkSubmit.scala | 8 +-- .../spark/deploy/SparkSubmitArguments.scala | 74 2 files changed, 45 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2f545438/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0379ade..b43e68e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -274,17 +274,11 @@ object SparkSubmit { } } -// Properties given with --conf are superceded by other options, but take precedence over -// properties in the defaults file. +// Load any properties specified through --conf and the default properties file for ((k, v) - args.sparkProperties) { sysProps.getOrElseUpdate(k, v) } -// Read from default spark properties, if any -for ((k, v) - args.defaultSparkProperties) { - sysProps.getOrElseUpdate(k, v) -} - // Resolve paths in certain spark properties val pathConfigs = Seq( spark.jars, http://git-wip-us.apache.org/repos/asf/spark/blob/2f545438/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 72a452e..f0e9ee6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -19,7 +19,6 @@ package org.apache.spark.deploy import java.util.jar.JarFile -import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.util.Utils @@ -72,39 +71,54 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St defaultProperties } - // Respect SPARK_*_MEMORY for cluster mode - driverMemory = sys.env.get(SPARK_DRIVER_MEMORY).orNull - executorMemory = sys.env.get(SPARK_EXECUTOR_MEMORY).orNull - + // Set parameters from command line arguments parseOpts(args.toList) - mergeSparkProperties() + // Populate `sparkProperties` map from properties file + mergeDefaultSparkProperties() + // Use `sparkProperties` map along with env vars to fill in any missing parameters + loadEnvironmentArguments() + checkRequiredArguments() /** - * Fill in any undefined values based on the default properties file or options passed in through - * the '--conf' flag. + * Merge values from the default properties file with those specified through --conf. + * When this is called, `sparkProperties` is already filled with configs from the latter. */ - private def mergeSparkProperties(): Unit = { + private def mergeDefaultSparkProperties(): Unit = { // Use common defaults file, if not specified by user propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env)) +// Honor --conf before the defaults file +defaultSparkProperties.foreach { case (k, v) = + if (!sparkProperties.contains(k)) { +sparkProperties(k) = v + } +} + } -val
git commit: SPARK-1209 [CORE] SparkHadoop{MapRed, MapReduce}Util should not use package org.apache.hadoop
Repository: spark Updated Branches: refs/heads/master 2f5454381 - 68cb69daf SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop (This is just a look at what completely moving the classes would look like. I know Patrick flagged that as maybe not OK, although, it's private?) Author: Sean Owen so...@cloudera.com Closes #2814 from srowen/SPARK-1209 and squashes the following commits: ead1115 [Sean Owen] Disable MIMA warnings resulting from moving the class -- this was also part of the PairRDDFunctions type hierarchy though? 2d42c1d [Sean Owen] Move SparkHadoopMapRedUtil / SparkHadoopMapReduceUtil from org.apache.hadoop to org.apache.spark Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68cb69da Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68cb69da Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68cb69da Branch: refs/heads/master Commit: 68cb69daf3022e973422e496ccf827ca3806ff30 Parents: 2f54543 Author: Sean Owen so...@cloudera.com Authored: Thu Oct 30 15:54:53 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Oct 30 15:54:53 2014 -0700 -- .../hadoop/mapred/SparkHadoopMapRedUtil.scala | 54 - .../mapreduce/SparkHadoopMapReduceUtil.scala| 79 --- .../org/apache/spark/SparkHadoopWriter.scala| 1 + .../spark/mapred/SparkHadoopMapRedUtil.scala| 56 ++ .../mapreduce/SparkHadoopMapReduceUtil.scala| 80 .../org/apache/spark/rdd/NewHadoopRDD.scala | 1 + .../org/apache/spark/rdd/PairRDDFunctions.scala | 3 +- project/MimaExcludes.scala | 8 ++ .../sql/parquet/ParquetTableOperations.scala| 1 + .../spark/sql/hive/hiveWriterContainers.scala | 1 + 10 files changed, 150 insertions(+), 134 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/68cb69da/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala -- diff --git a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala deleted file mode 100644 index 0c47afa..000 --- a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred - -private[apache] -trait SparkHadoopMapRedUtil { - def newJobContext(conf: JobConf, jobId: JobID): JobContext = { -val klass = firstAvailableClass(org.apache.hadoop.mapred.JobContextImpl, - org.apache.hadoop.mapred.JobContext) -val ctor = klass.getDeclaredConstructor(classOf[JobConf], - classOf[org.apache.hadoop.mapreduce.JobID]) -ctor.newInstance(conf, jobId).asInstanceOf[JobContext] - } - - def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = { -val klass = firstAvailableClass(org.apache.hadoop.mapred.TaskAttemptContextImpl, - org.apache.hadoop.mapred.TaskAttemptContext) -val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID]) -ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] - } - - def newTaskAttemptID( - jtIdentifier: String, - jobId: Int, - isMap: Boolean, - taskId: Int, - attemptId: Int) = { -new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) - } - - private def firstAvailableClass(first: String, second: String): Class[_] = { -try { - Class.forName(first) -} catch { - case e: ClassNotFoundException = -Class.forName(second) -} - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/68cb69da/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala -- diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
git commit: [SPARK-4120][SQL] Join of multiple tables with syntax like SELECT .. FROM T1, T2, T3.. does not work in SparkSQL
Repository: spark Updated Branches: refs/heads/master 68cb69daf - 9b6ebe33d [SPARK-4120][SQL] Join of multiple tables with syntax like SELECT .. FROM T1,T2,T3.. does not work in SparkSQL Right now it works for only 2 tables like below query. sql(SELECT * FROM records1 as a,records2 as b where a.key=b.key ) But it does not work for more than 2 tables like below query sql(SELECT * FROM records1 as a,records2 as b,records3 as c where a.key=b.key and a.key=c.key). Author: ravipesala ravindra.pes...@huawei.com Closes #2987 from ravipesala/multijoin and squashes the following commits: 429b005 [ravipesala] Support multiple joins Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b6ebe33 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b6ebe33 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b6ebe33 Branch: refs/heads/master Commit: 9b6ebe33db27be38c3036ffeda17096043fb0fb9 Parents: 68cb69d Author: ravipesala ravindra.pes...@huawei.com Authored: Thu Oct 30 17:15:45 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Thu Oct 30 17:15:45 2014 -0700 -- .../scala/org/apache/spark/sql/catalyst/SqlParser.scala | 3 ++- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 10 ++ 2 files changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b6ebe33/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 0acf725..942b843 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -166,7 +166,8 @@ class SqlParser extends AbstractSparkSQLParser { // Based very loosely on the MySQL Grammar. // http://dev.mysql.com/doc/refman/5.0/en/join.html protected lazy val relations: Parser[LogicalPlan] = -( relation ~ (, ~ relation) ^^ { case r1 ~ r2 = Join(r1, r2, Inner, None) } +( relation ~ rep1(, ~ relation) ^^ { +case r1 ~ joins = joins.foldLeft(r1) { case(lhs, r) = Join(lhs, r, Inner, None) } } | relation ) http://git-wip-us.apache.org/repos/asf/spark/blob/9b6ebe33/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1034c2d..4c36ca0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -899,4 +899,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { test(SPARK-3814 Support Bitwise ~ operator) { checkAnswer(sql(SELECT ~key FROM testData WHERE key = 1 ), -2) } + + test(SPARK-4120 Join of multiple tables does not work in SparkSQL) { +checkAnswer( + sql( +SELECT a.key, b.key, c.key + |FROM testData a,testData b,testData c + |where a.key = b.key and a.key = c.key +.stripMargin), + (1 to 100).map(i = Seq(i, i, i))) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-3968][SQL] Use parquet-mr filter2 api
Repository: spark Updated Branches: refs/heads/master 9b6ebe33d - 2e35e2429 [SPARK-3968][SQL] Use parquet-mr filter2 api The parquet-mr project has introduced a new filter api (https://github.com/apache/incubator-parquet-mr/pull/4), along with several fixes . It can also eliminate entire RowGroups depending on certain statistics like min/max We can leverage that to further improve performance of queries with filters. Also filter2 api introduces ability to create custom filters. We can create a custom filter for the optimized In clause (InSet) , so that elimination happens in the ParquetRecordReader itself Author: Yash Datta yash.da...@guavus.com Closes #2841 from saucam/master and squashes the following commits: 8282ba0 [Yash Datta] SPARK-3968: fix scala code style and add some more tests for filtering on optional columns 515df1c [Yash Datta] SPARK-3968: Add a test case for filter pushdown on optional column 5f4530e [Yash Datta] SPARK-3968: Fix scala code style f304667 [Yash Datta] SPARK-3968: Using task metadata strategy for row group filtering ec53e92 [Yash Datta] SPARK-3968: No push down should result in case we are unable to create a record filter 48163c3 [Yash Datta] SPARK-3968: Code cleanup cc7b596 [Yash Datta] SPARK-3968: 1. Fix RowGroupFiltering not working 2. Use the serialization/deserialization from Parquet library for filter pushdown caed851 [Yash Datta] Revert SPARK-3968: Not pushing the filters in case of OPTIONAL columns since filtering on optional columns is now supported in filter2 api 49703c9 [Yash Datta] SPARK-3968: Not pushing the filters in case of OPTIONAL columns 9d09741 [Yash Datta] SPARK-3968: Change parquet filter pushdown to use filter2 api of parquet-mr Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e35e242 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e35e242 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e35e242 Branch: refs/heads/master Commit: 2e35e24294ad8a5e76c89ea888fe330052dabd5a Parents: 9b6ebe3 Author: Yash Datta yash.da...@guavus.com Authored: Thu Oct 30 17:17:24 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Thu Oct 30 17:17:31 2014 -0700 -- pom.xml | 2 +- .../spark/sql/parquet/ParquetFilters.scala | 230 +++ .../sql/parquet/ParquetTableOperations.scala| 179 --- .../spark/sql/parquet/ParquetTestData.scala | 19 ++ .../spark/sql/parquet/ParquetQuerySuite.scala | 57 + 5 files changed, 308 insertions(+), 179 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2e35e242/pom.xml -- diff --git a/pom.xml b/pom.xml index e4c9247..379274d 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ !-- Version used for internal directory structure -- hive.version.short0.13.1/hive.version.short derby.version10.10.1.1/derby.version -parquet.version1.4.3/parquet.version +parquet.version1.6.0rc3/parquet.version jblas.version1.2.3/jblas.version jetty.version8.1.14.v20131031/jetty.version chill.version0.3.6/chill.version http://git-wip-us.apache.org/repos/asf/spark/blob/2e35e242/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 7c83f1c..517a5cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -21,8 +21,12 @@ import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration -import parquet.filter._ -import parquet.filter.ColumnPredicates._ +import parquet.filter2.compat.FilterCompat +import parquet.filter2.compat.FilterCompat._ +import parquet.filter2.predicate.FilterPredicate +import parquet.filter2.predicate.FilterApi +import parquet.filter2.predicate.FilterApi._ +import parquet.io.api.Binary import parquet.column.ColumnReader import com.google.common.io.BaseEncoding @@ -38,67 +42,74 @@ private[sql] object ParquetFilters { // set this to false if pushdown should be disabled val PARQUET_FILTER_PUSHDOWN_ENABLED = spark.sql.hints.parquetFilterPushdown - def createRecordFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = { + def createRecordFilter(filterExpressions: Seq[Expression]): Filter = { val filters: Seq[CatalystFilter] = filterExpressions.collect { case (expression: Expression) if createFilter(expression).isDefined =
git commit: Revert SPARK-1209 [CORE] SparkHadoop{MapRed, MapReduce}Util should not use package org.apache.hadoop
Repository: spark Updated Branches: refs/heads/master 2e35e2429 - 26d31d15f Revert SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop This reverts commit 68cb69daf3022e973422e496ccf827ca3806ff30. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26d31d15 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26d31d15 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26d31d15 Branch: refs/heads/master Commit: 26d31d15fda3f63707a28d1a1115770ad127cf8f Parents: 2e35e24 Author: Andrew Or and...@databricks.com Authored: Thu Oct 30 17:56:10 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Thu Oct 30 17:56:10 2014 -0700 -- .../hadoop/mapred/SparkHadoopMapRedUtil.scala | 54 + .../mapreduce/SparkHadoopMapReduceUtil.scala| 79 +++ .../org/apache/spark/SparkHadoopWriter.scala| 1 - .../spark/mapred/SparkHadoopMapRedUtil.scala| 56 -- .../mapreduce/SparkHadoopMapReduceUtil.scala| 80 .../org/apache/spark/rdd/NewHadoopRDD.scala | 1 - .../org/apache/spark/rdd/PairRDDFunctions.scala | 3 +- project/MimaExcludes.scala | 8 -- .../sql/parquet/ParquetTableOperations.scala| 1 - .../spark/sql/hive/hiveWriterContainers.scala | 1 - 10 files changed, 134 insertions(+), 150 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala -- diff --git a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala new file mode 100644 index 000..0c47afa --- /dev/null +++ b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred + +private[apache] +trait SparkHadoopMapRedUtil { + def newJobContext(conf: JobConf, jobId: JobID): JobContext = { +val klass = firstAvailableClass(org.apache.hadoop.mapred.JobContextImpl, + org.apache.hadoop.mapred.JobContext) +val ctor = klass.getDeclaredConstructor(classOf[JobConf], + classOf[org.apache.hadoop.mapreduce.JobID]) +ctor.newInstance(conf, jobId).asInstanceOf[JobContext] + } + + def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = { +val klass = firstAvailableClass(org.apache.hadoop.mapred.TaskAttemptContextImpl, + org.apache.hadoop.mapred.TaskAttemptContext) +val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID]) +ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] + } + + def newTaskAttemptID( + jtIdentifier: String, + jobId: Int, + isMap: Boolean, + taskId: Int, + attemptId: Int) = { +new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) + } + + private def firstAvailableClass(first: String, second: String): Class[_] = { +try { + Class.forName(first) +} catch { + case e: ClassNotFoundException = +Class.forName(second) +} + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala -- diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala new file mode 100644 index 000..1fca572 --- /dev/null +++ b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF
git commit: HOTFIX: Clean up build in network module.
Repository: spark Updated Branches: refs/heads/master 26d31d15f - 0734d0932 HOTFIX: Clean up build in network module. This is currently breaking the package build for some people (including me). This patch does some general clean-up which also fixes the current issue. - Uses consistent artifact naming - Adds sbt support for this module - Changes tests to use scalatest (fixes the original issue[1]) One thing to note, it turns out that scalatest when invoked in the Maven build doesn't succesfully detect JUnit Java tests. This is a long standing issue, I noticed it applies to all of our current test suites as well. I've created SPARK-4159 to fix this. [1] The original issue is that we need to allocate extra memory for the tests, happens by default in our scalatest configuration. Author: Patrick Wendell pwend...@gmail.com Closes #3025 from pwendell/hotfix and squashes the following commits: faa9053 [Patrick Wendell] HOTFIX: Clean up build in network module. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0734d093 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0734d093 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0734d093 Branch: refs/heads/master Commit: 0734d09320fe37edd3a02718511cda0bda852478 Parents: 26d31d1 Author: Patrick Wendell pwend...@gmail.com Authored: Thu Oct 30 20:15:36 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Thu Oct 30 20:15:36 2014 -0700 -- core/pom.xml | 2 +- network/common/pom.xml | 34 +- project/SparkBuild.scala | 8 +--- 3 files changed, 23 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0734d093/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 8020a2d..6963ce4 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -46,7 +46,7 @@ /dependency dependency groupIdorg.apache.spark/groupId - artifactIdnetwork/artifactId + artifactIdspark-network-common_2.10/artifactId version${project.version}/version /dependency dependency http://git-wip-us.apache.org/repos/asf/spark/blob/0734d093/network/common/pom.xml -- diff --git a/network/common/pom.xml b/network/common/pom.xml index e3b7e32..a33e44b 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -27,12 +27,12 @@ /parent groupIdorg.apache.spark/groupId - artifactIdnetwork/artifactId + artifactIdspark-network-common_2.10/artifactId packagingjar/packaging - nameShuffle Streaming Service/name + nameSpark Project Common Network Code/name urlhttp://spark.apache.org//url properties -sbt.project.namenetwork/sbt.project.name +sbt.project.namenetwork-common/sbt.project.name /properties dependencies @@ -60,6 +60,11 @@ scopetest/scope /dependency dependency + groupIdcom.novocode/groupId + artifactIdjunit-interface/artifactId + scopetest/scope +/dependency +dependency groupIdlog4j/groupId artifactIdlog4j/artifactId scopetest/scope @@ -69,25 +74,20 @@ artifactIdmockito-all/artifactId scopetest/scope /dependency +dependency + groupIdorg.scalatest/groupId + artifactIdscalatest_${scala.binary.version}/artifactId + scopetest/scope +/dependency /dependencies - build -outputDirectorytarget/java/classes/outputDirectory -testOutputDirectorytarget/java/test-classes/testOutputDirectory + outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory + testOutputDirectorytarget/scala-${scala.binary.version}/test-classes/testOutputDirectory plugins plugin -groupIdorg.apache.maven.plugins/groupId -artifactIdmaven-surefire-plugin/artifactId -version2.17/version -configuration - skipTestsfalse/skipTests - includes -include**/Test*.java/include -include**/*Test.java/include -include**/*Suite.java/include - /includes -/configuration +groupIdorg.scalatest/groupId +artifactIdscalatest-maven-plugin/artifactId /plugin /plugins /build http://git-wip-us.apache.org/repos/asf/spark/blob/0734d093/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6d5eb68..7708351 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -31,10 +31,10 @@ object BuildCommons { private val buildLocation = file(.).getAbsoluteFile.getParentFile val allProjects@Seq(bagel, catalyst, core, graphx, hive,
git commit: [SPARK-4124] [MLlib] [PySpark] simplify serialization in MLlib Python API
Repository: spark Updated Branches: refs/heads/master 0734d0932 - 872fc669b [SPARK-4124] [MLlib] [PySpark] simplify serialization in MLlib Python API Create several helper functions to call MLlib Java API, convert the arguments to Java type and convert return value to Python object automatically, this simplify serialization in MLlib Python API very much. After this, the MLlib Python API does not need to deal with serialization details anymore, it's easier to add new API. cc mengxr Author: Davies Liu dav...@databricks.com Closes #2995 from davies/cleanup and squashes the following commits: 8fa6ec6 [Davies Liu] address comments 16b85a0 [Davies Liu] Merge branch 'master' of github.com:apache/spark into cleanup 43743e5 [Davies Liu] bugfix 731331f [Davies Liu] simplify serialization in MLlib Python API Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/872fc669 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/872fc669 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/872fc669 Branch: refs/heads/master Commit: 872fc669b497fb255db3212568f2a14c2ba0d5db Parents: 0734d09 Author: Davies Liu dav...@databricks.com Authored: Thu Oct 30 22:25:18 2014 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Thu Oct 30 22:25:18 2014 -0700 -- .../spark/mllib/api/python/PythonMLLibAPI.scala | 84 ++-- python/pyspark/mllib/classification.py | 30 ++--- python/pyspark/mllib/clustering.py | 15 +-- python/pyspark/mllib/common.py | 135 +++ python/pyspark/mllib/feature.py | 122 +++-- python/pyspark/mllib/linalg.py | 12 -- python/pyspark/mllib/random.py | 34 ++--- python/pyspark/mllib/recommendation.py | 62 ++--- python/pyspark/mllib/regression.py | 52 +++ python/pyspark/mllib/stat.py| 65 ++--- python/pyspark/mllib/tree.py| 55 ++-- python/pyspark/mllib/util.py| 7 +- 12 files changed, 287 insertions(+), 386 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/872fc669/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 485abe2..acdc67d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.api.python import java.io.OutputStream -import java.util.{ArrayList = JArrayList} +import java.util.{ArrayList = JArrayList, List = JList, Map = JMap} import scala.collection.JavaConverters._ import scala.language.existentials @@ -72,15 +72,11 @@ class PythonMLLibAPI extends Serializable { private def trainRegressionModel( learner: GeneralizedLinearAlgorithm[_ : GeneralizedLinearModel], data: JavaRDD[LabeledPoint], - initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = { -val initialWeights = SerDe.loads(initialWeightsBA).asInstanceOf[Vector] + initialWeights: Vector): JList[Object] = { // Disable the uncached input warning because 'data' is a deliberately uncached MappedRDD. learner.disableUncachedWarning() val model = learner.run(data.rdd, initialWeights) -val ret = new java.util.LinkedList[java.lang.Object]() -ret.add(SerDe.dumps(model.weights)) -ret.add(model.intercept: java.lang.Double) -ret +List(model.weights, model.intercept).map(_.asInstanceOf[Object]).asJava } /** @@ -91,10 +87,10 @@ class PythonMLLibAPI extends Serializable { numIterations: Int, stepSize: Double, miniBatchFraction: Double, - initialWeightsBA: Array[Byte], + initialWeights: Vector, regParam: Double, regType: String, - intercept: Boolean): java.util.List[java.lang.Object] = { + intercept: Boolean): JList[Object] = { val lrAlg = new LinearRegressionWithSGD() lrAlg.setIntercept(intercept) lrAlg.optimizer @@ -113,7 +109,7 @@ class PythonMLLibAPI extends Serializable { trainRegressionModel( lrAlg, data, - initialWeightsBA) + initialWeights) } /** @@ -125,7 +121,7 @@ class PythonMLLibAPI extends Serializable { stepSize: Double, regParam: Double, miniBatchFraction: Double, - initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { + initialWeights: Vector): JList[Object] = { val lassoAlg = new
git commit: [SPARK-3250] Implement Gap Sampling optimization for random sampling
Repository: spark Updated Branches: refs/heads/master 872fc669b - ad3bd0dff [SPARK-3250] Implement Gap Sampling optimization for random sampling More efficient sampling, based on Gap Sampling optimization: http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/ Author: Erik Erlandson eerla...@redhat.com Closes #2455 from erikerlandson/spark-3250-pr and squashes the following commits: 72496bc [Erik Erlandson] [SPARK-3250] Implement Gap Sampling optimization for random sampling Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad3bd0df Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad3bd0df Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad3bd0df Branch: refs/heads/master Commit: ad3bd0dff8997861c5a04438145ba6f91c57a849 Parents: 872fc66 Author: Erik Erlandson eerla...@redhat.com Authored: Thu Oct 30 22:30:52 2014 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Thu Oct 30 22:30:52 2014 -0700 -- .../main/scala/org/apache/spark/rdd/RDD.scala | 6 +- .../spark/util/random/RandomSampler.scala | 286 - .../java/org/apache/spark/JavaAPISuite.java | 9 +- .../spark/util/random/RandomSamplerSuite.scala | 606 --- .../org/apache/spark/mllib/util/MLUtils.scala | 4 +- 5 files changed, 790 insertions(+), 121 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index b7f125d..c169b2d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -43,7 +43,8 @@ import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite} import org.apache.spark.util.collection.OpenHashMap -import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils} +import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliCellSampler, + SamplingUtils} /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, @@ -375,7 +376,8 @@ abstract class RDD[T: ClassTag]( val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x = - new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), true, seed) + new PartitionwiseSampledRDD[T, T]( +this, new BernoulliCellSampler[T](x(0), x(1)), true, seed) }.toArray } http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index ee389de..76e7a27 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -19,6 +19,9 @@ package org.apache.spark.util.random import java.util.Random +import scala.reflect.ClassTag +import scala.collection.mutable.ArrayBuffer + import org.apache.commons.math3.distribution.PoissonDistribution import org.apache.spark.annotation.DeveloperApi @@ -38,13 +41,47 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError(clone() is not implemented.) } +private[spark] +object RandomSampler { + /** Default random number generator used by random samplers. */ + def newDefaultRNG: Random = new XORShiftRandom + + /** + * Default maximum gap-sampling fraction. + * For sampling fractions = this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that tradtional Bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a new RNG + * is to experiment. When tuning for a new RNG, I would expect a value of 0.5 to be close + * in most cases, as an initial guess. + */ + val defaultMaxGapSamplingFraction = 0.4 + + /** + * Default epsilon for floating point numbers sampled from the RNG. + * The