spark git commit: [SPARK-9015] [BUILD] Clean project import in scala ide

2015-07-16 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 4ea6480a3 - b536d5dc6


[SPARK-9015] [BUILD] Clean project import in scala ide

Cleanup maven for a clean import in scala-ide / eclipse.

* remove groovy plugin which is really not needed at all
* add-source from build-helper-maven-plugin is not needed as recent version of 
scala-maven-plugin do it automatically
* add lifecycle-mapping plugin to hide a few useless warnings from ide

Author: Jan Prach jen...@gmail.com

Closes #7375 from jendap/clean-project-import-in-scala-ide and squashes the 
following commits:

c4b4c0f [Jan Prach] fix whitespaces
5a83e07 [Jan Prach] Revert remove java compiler warnings from java tests
312007e [Jan Prach] scala-maven-plugin itself add scala sources by default
f47d856 [Jan Prach] remove spark-1.4-staging repository
c8a54db [Jan Prach] remove java compiler warnings from java tests
999a068 [Jan Prach] remove some maven warnings in scala ide
80fbdc5 [Jan Prach] remove groovy and gmavenplus plugin


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b536d5dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b536d5dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b536d5dc

Branch: refs/heads/master
Commit: b536d5dc6c2c712270b8130ddd9945dff19a27d9
Parents: 4ea6480
Author: Jan Prach jen...@gmail.com
Authored: Thu Jul 16 18:42:41 2015 +0100
Committer: Sean Owen so...@cloudera.com
Committed: Thu Jul 16 18:42:41 2015 +0100

--
 pom.xml  | 130 --
 repl/pom.xml |   2 -
 sql/core/pom.xml |   1 -
 sql/hive/pom.xml |   1 -
 tools/pom.xml|   4 --
 5 files changed, 53 insertions(+), 85 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b536d5dc/pom.xml
--
diff --git a/pom.xml b/pom.xml
index aa49e2a..c5c6558 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,7 +152,6 @@
 aws.kinesis.client.version1.2.1/aws.kinesis.client.version
 commons.httpclient.version4.3.2/commons.httpclient.version
 commons.math3.version3.4.1/commons.math3.version
-
test_classpath_file${project.build.directory}/spark-test-classpath.txt/test_classpath_file
 scala.version2.10.4/scala.version
 scala.binary.version2.10/scala.binary.version
 jline.version${scala.version}/jline.version
@@ -283,18 +282,6 @@
 enabledfalse/enabled
   /snapshots
 /repository
-!-- TODO: This can be deleted after Spark 1.4 is posted --
-repository
-  idspark-1.4-staging/id
-  nameSpark 1.4 RC4 Staging Repository/name
-  
urlhttps://repository.apache.org/content/repositories/orgapachespark-1112/url
-  releases
-enabledtrue/enabled
-  /releases
-  snapshots
-enabledfalse/enabled
-  /snapshots
-/repository
   /repositories
   pluginRepositories
 pluginRepository
@@ -319,17 +306,6 @@
   version1.0.0/version
 /dependency
 !--
-  This depndency has been added to provided scope as it is needed for 
executing build
-  specific groovy scripts using gmaven+ and not required for downstream 
project building
-  with spark.
---
-dependency
-  groupIdorg.codehaus.groovy/groupId
-  artifactIdgroovy-all/artifactId
-  version2.3.7/version
-  scopeprovided/scope
-/dependency
-!--
  This is needed by the scalatest plugin, and so is declared here to be 
available in
  all child modules, just as scalatest is run in all children
 --
@@ -1412,6 +1388,58 @@
   artifactIdmaven-deploy-plugin/artifactId
   version2.8.2/version
 /plugin
+!-- This plugin's configuration is used to store Eclipse m2e settings 
only. --
+!-- It has no influence on the Maven build itself. --
+plugin
+  groupIdorg.eclipse.m2e/groupId
+  artifactIdlifecycle-mapping/artifactId
+  version1.0.0/version
+  configuration
+lifecycleMappingMetadata
+  pluginExecutions
+pluginExecution
+  pluginExecutionFilter
+groupIdorg.apache.maven.plugins/groupId
+artifactIdmaven-dependency-plugin/artifactId
+versionRange[2.8,)/versionRange
+goals
+  goalbuild-classpath/goal
+/goals
+  /pluginExecutionFilter
+  action
+ignore/ignore
+  /action
+/pluginExecution
+pluginExecution
+  pluginExecutionFilter
+groupIdorg.apache.maven.plugins/groupId
+artifactIdmaven-jar-plugin/artifactId
+

spark git commit: [SPARK-6941] [SQL] Provide a better error message to when inserting into RDD based table

2015-07-16 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master b536d5dc6 - 43dac2c88


[SPARK-6941] [SQL] Provide a better error message to when inserting into RDD 
based table

JIRA: https://issues.apache.org/jira/browse/SPARK-6941

Author: Yijie Shen henry.yijies...@gmail.com

Closes #7342 from yijieshen/SPARK-6941 and squashes the following commits:

f82cbe7 [Yijie Shen] reorder import
dd67e40 [Yijie Shen] resolve comments
09518af [Yijie Shen] fix import order in DataframeSuite
0c635d4 [Yijie Shen] make match more specific
9df388d [Yijie Shen] move check into PreWriteCheck
847ab20 [Yijie Shen] Detect insertion error in DataSourceStrategy


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43dac2c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43dac2c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43dac2c8

Branch: refs/heads/master
Commit: 43dac2c880d6f310a958531aee0bb4ac1d9b7025
Parents: b536d5d
Author: Yijie Shen henry.yijies...@gmail.com
Authored: Thu Jul 16 10:52:09 2015 -0700
Committer: Yin Huai yh...@databricks.com
Committed: Thu Jul 16 10:52:09 2015 -0700

--
 .../org/apache/spark/sql/sources/rules.scala|  9 +++-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 55 ++--
 2 files changed, 60 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/43dac2c8/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index a3fd7f1..40ee048 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{SaveMode, AnalysisException}
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
 import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.types.DataType
 
@@ -119,6 +119,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) 
extends (LogicalPlan =
 // The relation in l is not an InsertableRelation.
 failAnalysis(s$l does not allow insertion.)
 
+  case logical.InsertIntoTable(t, _, _, _, _) =
+if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || 
t.isInstanceOf[LocalRelation]) {
+  failAnalysis(sInserting into an RDD-based table is not allowed.)
+} else {
+  // OK
+}
+
   case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, 
query) =
 // When the SaveMode is Overwrite, we need to check if the table is an 
input table of
 // the query. If so, we will throw an AnalysisException to let users 
know it is not allowed.

http://git-wip-us.apache.org/repos/asf/spark/blob/43dac2c8/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f592a99..23244fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -17,19 +17,23 @@
 
 package org.apache.spark.sql
 
+import java.io.File
+
 import scala.language.postfixOps
 
+import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint}
-
+import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}
 
-class DataFrameSuite extends QueryTest {
+class DataFrameSuite extends QueryTest with SQLTestUtils {
   import org.apache.spark.sql.TestData._
 
   lazy val ctx = org.apache.spark.sql.test.TestSQLContext
   import ctx.implicits._
 
+  def sqlContext: SQLContext = ctx
+
   test(analysis error should be eagerly reported) {
 val oldSetting = ctx.conf.dataFrameEagerAnalysis
 // Eager analysis.
@@ -761,4 +765,49 @@ class DataFrameSuite extends QueryTest {
 assert(f.getMessage.contains(column3))
 assert(!f.getMessage.contains(column2))
   }
+
+  test(SPARK-6941: Better error message for inserting into RDD-based Table) {
+withTempDir { dir =
+
+  val tempParquetFile = new File(dir, tmp_parquet)
+  val tempJsonFile = new 

spark git commit: [SPARK-8807] [SPARKR] Add between operator in SparkR

2015-07-16 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master e27212317 - 0a795336d


[SPARK-8807] [SPARKR] Add between operator in SparkR

JIRA: https://issues.apache.org/jira/browse/SPARK-8807

Add between operator in SparkR.

Author: Liang-Chi Hsieh vii...@appier.com

Closes #7356 from viirya/add_r_between and squashes the following commits:

7f51b44 [Liang-Chi Hsieh] Add test for non-numeric column.
c6a25c5 [Liang-Chi Hsieh] Add between function.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a795336
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a795336
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a795336

Branch: refs/heads/master
Commit: 0a795336df20c7ec969366e613286f0c060a4eeb
Parents: e272123
Author: Liang-Chi Hsieh vii...@appier.com
Authored: Wed Jul 15 23:36:57 2015 -0700
Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu
Committed: Wed Jul 15 23:36:57 2015 -0700

--
 R/pkg/NAMESPACE  |  1 +
 R/pkg/R/column.R | 17 +
 R/pkg/R/generics.R   |  4 
 R/pkg/inst/tests/test_sparkSQL.R | 12 
 4 files changed, 34 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a795336/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 7f85722..331307c 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -77,6 +77,7 @@ exportMethods(abs,
   atan,
   atan2,
   avg,
+  between,
   cast,
   cbrt,
   ceiling,

http://git-wip-us.apache.org/repos/asf/spark/blob/0a795336/R/pkg/R/column.R
--
diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R
index 8e4b0f5..2892e14 100644
--- a/R/pkg/R/column.R
+++ b/R/pkg/R/column.R
@@ -187,6 +187,23 @@ setMethod(substr, signature(x = Column),
 column(jc)
   })
 
+#' between
+#'
+#' Test if the column is between the lower bound and upper bound, inclusive.
+#'
+#' @rdname column
+#'
+#' @param bounds lower and upper bounds
+setMethod(between, signature(x = Column),
+  function(x, bounds) {
+if (is.vector(bounds)  length(bounds) == 2) {
+  jc - callJMethod(x@jc, between, bounds[1], bounds[2])
+  column(jc)
+} else {
+  stop(bounds should be a vector of lower and upper bounds)
+}
+  })
+
 #' Casts the column to a different data type.
 #'
 #' @rdname column

http://git-wip-us.apache.org/repos/asf/spark/blob/0a795336/R/pkg/R/generics.R
--
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index fad9d71..ebe6fbd 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -569,6 +569,10 @@ setGeneric(avg, function(x, ...) { 
standardGeneric(avg) })
 
 #' @rdname column
 #' @export
+setGeneric(between, function(x, bounds) { standardGeneric(between) })
+
+#' @rdname column
+#' @export
 setGeneric(cast, function(x, dataType) { standardGeneric(cast) })
 
 #' @rdname column

http://git-wip-us.apache.org/repos/asf/spark/blob/0a795336/R/pkg/inst/tests/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 76f74f8..cdfe648 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -638,6 +638,18 @@ test_that(column functions, {
   c7 - floor(c) + log(c) + log10(c) + log1p(c) + rint(c)
   c8 - sign(c) + sin(c) + sinh(c) + tan(c) + tanh(c)
   c9 - toDegrees(c) + toRadians(c)
+
+  df - jsonFile(sqlContext, jsonPath)
+  df2 - select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))
+  expect_equal(collect(df2)[[2, 1]], TRUE)
+  expect_equal(collect(df2)[[2, 2]], FALSE)
+  expect_equal(collect(df2)[[3, 1]], FALSE)
+  expect_equal(collect(df2)[[3, 2]], TRUE)
+
+  df3 - select(df, between(df$name, c(Apache, Spark)))
+  expect_equal(collect(df3)[[1, 1]], TRUE)
+  expect_equal(collect(df3)[[2, 1]], FALSE)
+  expect_equal(collect(df3)[[3, 1]], TRUE)
 })
 
 test_that(column binary mathfunctions, {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8972] [SQL] Incorrect result for rollup

2015-07-16 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master ba3309684 - e27212317


[SPARK-8972] [SQL] Incorrect result for rollup

We don't support the complex expression keys in the rollup/cube, and we even 
will not report it if we have the complex group by keys, that will cause very 
confusing/incorrect result.

e.g. `SELECT key%100 FROM src GROUP BY key %100 with ROLLUP`

This PR adds an additional project during the analyzing for the complex GROUP 
BY keys, and that projection will be the child of `Expand`, so to `Expand`, the 
GROUP BY KEY are always the simple key(attribute names).

Author: Cheng Hao hao.ch...@intel.com

Closes #7343 from chenghao-intel/expand and squashes the following commits:

1ebbb59 [Cheng Hao] update the comment
827873f [Cheng Hao] update as feedback
34def69 [Cheng Hao] Add more unit test and comments
c695760 [Cheng Hao] fix bug of incorrect result for rollup


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2721231
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2721231
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2721231

Branch: refs/heads/master
Commit: e27212317c7341852c52d9a85137b8f94cb0d935
Parents: ba33096
Author: Cheng Hao hao.ch...@intel.com
Authored: Wed Jul 15 23:35:27 2015 -0700
Committer: Yin Huai yh...@databricks.com
Committed: Wed Jul 15 23:35:27 2015 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 42 +--
 ...r CUBE #1-0-63b61fb3f0e74226001ad279be440864 |  6 +++
 ...r CUBE #2-0-7a511f02a16f0af4f810b1666cfcd896 | 10 
 ...oupingSet-0-8c14c24670a4b06c440346277ce9cf1c | 10 
 ...Rollup #1-0-a78e3dbf242f240249e36b3d3fd0926a |  6 +++
 ...Rollup #2-0-bf180c9d1a18f61b9d9f31bb0115cf89 | 10 
 ...Rollup #3-0-9257085d123728730be96b6d9fbb84ce | 10 
 .../sql/hive/execution/HiveQuerySuite.scala | 54 
 8 files changed, 145 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e2721231/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 891408e..df8e7f2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -194,16 +194,52 @@ class Analyzer(
 }
 
 def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case a if !a.childrenResolved = a // be sure all of the children are 
resolved.
   case a: Cube =
 GroupingSets(bitmasks(a), a.groupByExprs, a.child, a.aggregations)
   case a: Rollup =
 GroupingSets(bitmasks(a), a.groupByExprs, a.child, a.aggregations)
   case x: GroupingSets =
 val gid = AttributeReference(VirtualColumn.groupingIdName, 
IntegerType, false)()
+// We will insert another Projection if the GROUP BY keys contains the
+// non-attribute expressions. And the top operators can references 
those
+// expressions by its alias.
+// e.g. SELECT key%5 as c1 FROM src GROUP BY key%5 ==
+//  SELECT a as c1 FROM (SELECT key%5 AS a FROM src) GROUP BY a
+
+// find all of the non-attribute expressions in the GROUP BY keys
+val nonAttributeGroupByExpressions = new ArrayBuffer[Alias]()
+
+// The pair of (the original GROUP BY key, associated attribute)
+val groupByExprPairs = x.groupByExprs.map(_ match {
+  case e: NamedExpression = (e, e.toAttribute)
+  case other = {
+val alias = Alias(other, other.toString)()
+nonAttributeGroupByExpressions += alias // add the non-attributes 
expression alias
+(other, alias.toAttribute)
+  }
+})
+
+// substitute the non-attribute expressions for aggregations.
+val aggregation = x.aggregations.map(expr = expr.transformDown {
+  case e = 
groupByExprPairs.find(_._1.semanticEquals(e)).map(_._2).getOrElse(e)
+}.asInstanceOf[NamedExpression])
+
+// substitute the group by expressions.
+val newGroupByExprs = groupByExprPairs.map(_._2)
+
+val child = if (nonAttributeGroupByExpressions.length  0) {
+  // insert additional projection if contains the
+  // non-attribute expressions in the GROUP BY keys
+  Project(x.child.output ++ nonAttributeGroupByExpressions, x.child)
+} else {
+  x.child
+}
+
 Aggregate(
-  x.groupByExprs :+ VirtualColumn.groupingIdAttribute,
-  x.aggregations,
-  

spark git commit: [SPARK-8893] Add runtime checks against non-positive number of partitions

2015-07-16 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 0a795336d - 011551620


[SPARK-8893] Add runtime checks against non-positive number of partitions

https://issues.apache.org/jira/browse/SPARK-8893

 What does `sc.parallelize(1 to 3).repartition(p).collect` return? I would 
 expect `Array(1, 2, 3)` regardless of `p`. But if `p`  1, it returns 
 `Array()`. I think instead it should throw an `IllegalArgumentException`.

 I think the case is pretty clear for `p`  0. But the behavior for `p` = 0 is 
 also error prone. In fact that's how I found this strange behavior. I used 
 `rdd.repartition(a/b)` with positive `a` and `b`, but `a/b` was rounded down 
 to zero and the results surprised me. I'd prefer an exception instead of 
 unexpected (corrupt) results.

Author: Daniel Darabos darabos.dan...@gmail.com

Closes #7285 from darabos/patch-1 and squashes the following commits:

decba82 [Daniel Darabos] Allow repartitioning empty RDDs to zero partitions.
97de852 [Daniel Darabos] Allow zero partition count in HashPartitioner
f6ba5fb [Daniel Darabos] Use require() for simpler syntax.
d5e3df8 [Daniel Darabos] Require positive number of partitions in 
HashPartitioner
897c628 [Daniel Darabos] Require positive maxPartitions in CoalescedRDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01155162
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01155162
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01155162

Branch: refs/heads/master
Commit: 011551620faa87107a787530f074af3d9be7e695
Parents: 0a79533
Author: Daniel Darabos darabos.dan...@gmail.com
Authored: Thu Jul 16 08:16:54 2015 +0100
Committer: Sean Owen so...@cloudera.com
Committed: Thu Jul 16 08:16:54 2015 +0100

--
 core/src/main/scala/org/apache/spark/Partitioner.scala  | 2 ++
 core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 5 -
 2 files changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/01155162/core/src/main/scala/org/apache/spark/Partitioner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 82889bc..ad68512 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -76,6 +76,8 @@ object Partitioner {
  * produce an unexpected or incorrect result.
  */
 class HashPartitioner(partitions: Int) extends Partitioner {
+  require(partitions = 0, sNumber of partitions ($partitions) cannot be 
negative.)
+
   def numPartitions: Int = partitions
 
   def getPartition(key: Any): Int = key match {

http://git-wip-us.apache.org/repos/asf/spark/blob/01155162/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 663eebb..90d9735 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -69,7 +69,7 @@ private[spark] case class CoalescedRDDPartition(
  * the preferred location of each new partition overlaps with as many 
preferred locations of its
  * parent partitions
  * @param prev RDD to be coalesced
- * @param maxPartitions number of desired partitions in the coalesced RDD
+ * @param maxPartitions number of desired partitions in the coalesced RDD 
(must be positive)
  * @param balanceSlack used to trade-off balance and locality. 1.0 is all 
locality, 0 is all balance
  */
 private[spark] class CoalescedRDD[T: ClassTag](
@@ -78,6 +78,9 @@ private[spark] class CoalescedRDD[T: ClassTag](
 balanceSlack: Double = 0.10)
   extends RDD[T](prev.context, Nil) {  // Nil since we implement 
getDependencies
 
+  require(maxPartitions  0 || maxPartitions == prev.partitions.length,
+sNumber of partitions ($maxPartitions) must be positive.)
+
   override def getPartitions: Array[Partition] = {
 val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8995] [SQL] cast date strings like '2015-01-01 12:15:31' to date

2015-07-16 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 011551620 - 4ea6480a3


[SPARK-8995] [SQL] cast date strings like '2015-01-01 12:15:31' to date

Jira https://issues.apache.org/jira/browse/SPARK-8995

In PR #6981we noticed that we cannot cast date strings that contains a time, 
like '2015-03-18 12:39:40' to date. Besides it's not possible to cast a string 
like '18:03:20' to a timestamp.

If a time is passed without a date, today is inferred as date.

Author: Tarek Auel tarek.a...@googlemail.com
Author: Tarek Auel tarek.a...@gmail.com

Closes #7353 from tarekauel/SPARK-8995 and squashes the following commits:

14f333b [Tarek Auel] [SPARK-8995] added tests for daylight saving time
ca1ae69 [Tarek Auel] [SPARK-8995] style fix
d20b8b4 [Tarek Auel] [SPARK-8995] bug fix: distinguish between 0 and null
ef05753 [Tarek Auel] [SPARK-8995] added check for year = 1000
01c9ff3 [Tarek Auel] [SPARK-8995] support for time strings
34ec573 [Tarek Auel] fixed style
71622c0 [Tarek Auel] improved timestamp and date parsing
0e30c0a [Tarek Auel] Hive compatibility
cfbaed7 [Tarek Auel] fixed wrong checks
71f89c1 [Tarek Auel] [SPARK-8995] minor style fix
f7452fa [Tarek Auel] [SPARK-8995] removed old timestamp parsing
30e5aec [Tarek Auel] [SPARK-8995] date and timestamp cast
c1083fb [Tarek Auel] [SPARK-8995] cast date strings like '2015-01-01 12:15:31' 
to date or timestamp


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4ea6480a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4ea6480a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4ea6480a

Branch: refs/heads/master
Commit: 4ea6480a3ba4ca7e09089c9b99d4a855894b9015
Parents: 0115516
Author: Tarek Auel tarek.a...@googlemail.com
Authored: Thu Jul 16 08:26:39 2015 -0700
Committer: Davies Liu davies@gmail.com
Committed: Thu Jul 16 08:26:39 2015 -0700

--
 .../spark/sql/catalyst/expressions/Cast.scala   |  17 +-
 .../spark/sql/catalyst/util/DateTimeUtils.scala | 198 +
 .../sql/catalyst/expressions/CastSuite.scala| 144 
 .../sql/catalyst/util/DateTimeUtilsSuite.scala  | 218 +++
 4 files changed, 562 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4ea6480a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index ab02add..83d5b3b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -167,17 +167,7 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
   // TimestampConverter
   private[this] def castToTimestamp(from: DataType): Any = Any = from match {
 case StringType =
-  buildCast[UTF8String](_, utfs = {
-// Throw away extra if more than 9 decimal places
-val s = utfs.toString
-val periodIdx = s.indexOf(.)
-var n = s
-if (periodIdx != -1  n.length() - periodIdx  9) {
-  n = n.substring(0, periodIdx + 10)
-}
-try DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(n))
-catch { case _: java.lang.IllegalArgumentException = null }
-  })
+  buildCast[UTF8String](_, utfs = 
DateTimeUtils.stringToTimestamp(utfs).orNull)
 case BooleanType =
   buildCast[Boolean](_, b = if (b) 1L else 0)
 case LongType =
@@ -220,10 +210,7 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
   // DateConverter
   private[this] def castToDate(from: DataType): Any = Any = from match {
 case StringType =
-  buildCast[UTF8String](_, s =
-try DateTimeUtils.fromJavaDate(Date.valueOf(s.toString))
-catch { case _: java.lang.IllegalArgumentException = null }
-  )
+  buildCast[UTF8String](_, s = DateTimeUtils.stringToDate(s).orNull)
 case TimestampType =
   // throw valid precision more than seconds, according to Hive.
   // Timestamp.nanos is in 0 to 999,999,999, no more than a second.

http://git-wip-us.apache.org/repos/asf/spark/blob/4ea6480a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index c1ddee3..53c32a0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 

spark git commit: [SPARK-8646] PySpark does not run on YARN if master not provided in command line

2015-07-16 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 57e9b13bf - 49351c7f5


[SPARK-8646] PySpark does not run on YARN if master not provided in command line

andrewor14 davies vanzin can you take a look at this? thanks

Author: Lianhui Wang lianhuiwan...@gmail.com

Closes #7438 from lianhuiwang/SPARK-8646 and squashes the following commits:

cb3f12d [Lianhui Wang] add whitespace
6d874a6 [Lianhui Wang] support pyspark for yarn-client


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49351c7f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49351c7f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49351c7f

Branch: refs/heads/master
Commit: 49351c7f597c67950cc65e5014a89fad31b9a6f7
Parents: 57e9b13
Author: Lianhui Wang lianhuiwan...@gmail.com
Authored: Thu Jul 16 19:31:14 2015 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Jul 16 19:31:45 2015 -0700

--
 python/pyspark/context.py | 5 +
 yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +-
 2 files changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/49351c7f/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index d746672..43bde5a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -152,6 +152,11 @@ class SparkContext(object):
 self.master = self._conf.get(spark.master)
 self.appName = self._conf.get(spark.app.name)
 self.sparkHome = self._conf.get(spark.home, None)
+
+# Let YARN know it's a pyspark app, so it distributes needed libraries.
+if self.master == yarn-client:
+self._conf.set(spark.yarn.isPython, true)
+
 for (k, v) in self._conf.getAll():
 if k.startswith(spark.executorEnv.):
 varName = k[len(spark.executorEnv.):]

http://git-wip-us.apache.org/repos/asf/spark/blob/49351c7f/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f86b6d1..b74ea9a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -616,7 +616,7 @@ private[spark] class Client(
 val appId = newAppResponse.getApplicationId
 val appStagingDir = getAppStagingDir(appId)
 val pySparkArchives =
-  if (sys.props.getOrElse(spark.yarn.isPython, false).toBoolean) {
+  if (sparkConf.getBoolean(spark.yarn.isPython, false)) {
 findPySparkArchives()
   } else {
 Nil


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8119] HeartbeatReceiver should replace executors, not kill

2015-07-16 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master d86bbb4e2 - 96aa3340f


[SPARK-8119] HeartbeatReceiver should replace executors, not kill

**Symptom.** If an executor in an application times out, `HeartbeatReceiver` 
attempts to kill it. After this happens, however, the application never gets an 
executor back even when there are cluster resources available.

**Cause.** The issue is that `sc.killExecutor` automatically assumes that the 
application wishes to adjust its resource requirements permanently downwards. 
This is not the intention in `HeartbeatReceiver`, however, which simply wants a 
replacement for the expired executor.

**Fix.** Differentiate between the intention to kill and the intention to 
replace an executor with a fresh one. More details can be found in the commit 
message.

Author: Andrew Or and...@databricks.com

Closes #7107 from andrewor14/heartbeat-no-kill and squashes the following 
commits:

1cd2cd7 [Andrew Or] Add regression test for SPARK-8119
25a347d [Andrew Or] Reuse more code in scheduler backend
31ebd40 [Andrew Or] Differentiate between kill and replace


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96aa3340
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96aa3340
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96aa3340

Branch: refs/heads/master
Commit: 96aa3340f41d8de4560caec97e8f3de23252c792
Parents: d86bbb4
Author: Andrew Or and...@databricks.com
Authored: Thu Jul 16 19:39:54 2015 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Jul 16 19:39:54 2015 -0700

--
 .../org/apache/spark/HeartbeatReceiver.scala|   4 +-
 .../scala/org/apache/spark/SparkContext.scala   |  40 -
 .../cluster/CoarseGrainedSchedulerBackend.scala |  40 +++--
 .../apache/spark/HeartbeatReceiverSuite.scala   | 147 ---
 4 files changed, 194 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/96aa3340/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
--
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 221b1da..43dd4a1 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -181,7 +181,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
   // Asynchronously kill the executor to avoid blocking the current 
thread
   killExecutorThread.submit(new Runnable {
 override def run(): Unit = Utils.tryLogNonFatalError {
-  sc.killExecutor(executorId)
+  // Note: we want to get an executor back after expiring this one,
+  // so do not simply call `sc.killExecutor` here (SPARK-8119)
+  sc.killAndReplaceExecutor(executorId)
 }
   })
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/96aa3340/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bd1cc33..d00c012 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1419,6 +1419,12 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   /**
* :: DeveloperApi ::
* Request that the cluster manager kill the specified executors.
+   *
+   * Note: This is an indication to the cluster manager that the application 
wishes to adjust
+   * its resource usage downwards. If the application wishes to replace the 
executors it kills
+   * through this method with new ones, it should follow up explicitly with a 
call to
+   * {{SparkContext#requestExecutors}}.
+   *
* This is currently only supported in YARN mode. Return whether the request 
is received.
*/
   @DeveloperApi
@@ -1436,12 +1442,42 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
 
   /**
* :: DeveloperApi ::
-   * Request that cluster manager the kill the specified executor.
-   * This is currently only supported in Yarn mode. Return whether the request 
is received.
+   * Request that the cluster manager kill the specified executor.
+   *
+   * Note: This is an indication to the cluster manager that the application 
wishes to adjust
+   * its resource usage downwards. If the application wishes to replace the 
executor it kills
+   * through this method with a new one, it should follow up explicitly with a 
call to
+   * {{SparkContext#requestExecutors}}.
+   *
+   

spark git commit: [SPARK-6284] [MESOS] Add mesos role, principal and secret

2015-07-16 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 49351c7f5 - d86bbb4e2


[SPARK-6284] [MESOS] Add mesos role, principal and secret

Mesos supports framework authentication and role to be set per framework, which 
the role is used to identify the framework's role which impacts the sharing 
weight of resource allocation and optional authentication information to allow 
the framework to be connected to the master.

Author: Timothy Chen tnac...@gmail.com

Closes #4960 from tnachen/mesos_fw_auth and squashes the following commits:

0f9f03e [Timothy Chen] Fix review comments.
8f9488a [Timothy Chen] Fix rebase
f7fc2a9 [Timothy Chen] Add mesos role, auth and secret.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d86bbb4e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d86bbb4e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d86bbb4e

Branch: refs/heads/master
Commit: d86bbb4e286f16f77ba125452b07827684eafeed
Parents: 49351c7
Author: Timothy Chen tnac...@gmail.com
Authored: Thu Jul 16 19:36:45 2015 -0700
Committer: Andrew Or and...@databricks.com
Committed: Thu Jul 16 19:37:15 2015 -0700

--
 .../mesos/CoarseMesosSchedulerBackend.scala |  35 +++---
 .../cluster/mesos/MesosClusterScheduler.scala   |  28 ++---
 .../cluster/mesos/MesosSchedulerBackend.scala   | 118 ++---
 .../cluster/mesos/MesosSchedulerUtils.scala | 126 ---
 .../CoarseMesosSchedulerBackendSuite.scala  |  19 ++-
 .../mesos/MesosSchedulerBackendSuite.scala  | 106 +++-
 docs/running-on-mesos.md|  22 
 7 files changed, 358 insertions(+), 96 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d86bbb4e/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index cbade13..b7fde0d 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
-import java.util.{List = JList, Collections}
 import java.util.concurrent.locks.ReentrantLock
+import java.util.{Collections, List = JList}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet}
@@ -27,12 +27,11 @@ import scala.collection.mutable.{HashMap, HashSet}
 import com.google.common.collect.HashBiMap
 import org.apache.mesos.Protos.{TaskInfo = MesosTaskInfo, _}
 import org.apache.mesos.{Scheduler = MScheduler, _}
-import org.apache.mesos.Protos.{TaskInfo = MesosTaskInfo, _}
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses coarse-grained 
tasks, where it holds
@@ -69,7 +68,7 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   /**
* The total number of executors we aim to have. Undefined when not using 
dynamic allocation
-   * and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors]].
+   * and before the ExecutorAllocatorManager calls [[doRequestTotalExecutors]].
*/
   private var executorLimitOption: Option[Int] = None
 
@@ -103,8 +102,9 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   override def start() {
 super.start()
-val fwInfo = 
FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
-startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
+val driver = createSchedulerDriver(
+  master, CoarseMesosSchedulerBackend.this, sc.sparkUser, sc.appName, 
sc.conf)
+startScheduler(driver)
   }
 
   def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
@@ -224,24 +224,29 @@ private[spark] class CoarseMesosSchedulerBackend(
   taskIdToSlaveId(taskId) = slaveId
   slaveIdsWithExecutors += slaveId
   coresByTaskId(taskId) = cpusToUse
-  val task = MesosTaskInfo.newBuilder()
+  // Gather cpu resources from the available resources and use them in 
the task.
+  val (remainingResources, cpuResourcesToUse) =
+partitionResources(offer.getResourcesList, cpus, cpusToUse)

spark git commit: [SPARK-6304] [STREAMING] Fix checkpointing doesn't retain driver port issue.

2015-07-16 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master fec10f0c6 - 031d7d414


[SPARK-6304] [STREAMING] Fix checkpointing doesn't retain driver port issue.

Author: jerryshao saisai.s...@intel.com
Author: Saisai Shao saisai.s...@intel.com

Closes #5060 from jerryshao/SPARK-6304 and squashes the following commits:

89b01f5 [jerryshao] Update the unit test to add more cases
275d252 [jerryshao] Address the comments
7cc146d [jerryshao] Address the comments
2624723 [jerryshao] Fix rebase conflict
45befaa [Saisai Shao] Update the unit test
bbc1c9c [Saisai Shao] Fix checkpointing doesn't retain driver port issue


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/031d7d41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/031d7d41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/031d7d41

Branch: refs/heads/master
Commit: 031d7d41430ec1f3c3353e33eab4821a9bcd58a5
Parents: fec10f0
Author: jerryshao saisai.s...@intel.com
Authored: Thu Jul 16 16:55:46 2015 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Thu Jul 16 16:55:46 2015 -0700

--
 .../org/apache/spark/streaming/Checkpoint.scala |  2 +
 .../spark/streaming/CheckpointSuite.scala   | 45 +++-
 2 files changed, 46 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/031d7d41/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 5279331..65d4e93 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -48,6 +48,8 @@ class Checkpoint(@transient ssc: StreamingContext, val 
checkpointTime: Time)
 // Reload properties for the checkpoint application since user wants to 
set a reload property
 // or spark had changed its value and user wants to set it back.
 val propertiesToReload = List(
+  spark.driver.host,
+  spark.driver.port,
   spark.master,
   spark.yarn.keytab,
   spark.yarn.principal)

http://git-wip-us.apache.org/repos/asf/spark/blob/031d7d41/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 6a94928..d308ac0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -191,8 +191,51 @@ class CheckpointSuite extends TestSuiteBase {
 }
   }
 
+  // This tests if spark.driver.host and spark.driver.port is set by user, 
can be recovered
+  // with correct value.
+  test(get correct spark.driver.[host|port] from checkpoint) {
+val conf = Map(spark.driver.host - localhost, spark.driver.port - 
)
+conf.foreach(kv = System.setProperty(kv._1, kv._2))
+ssc = new StreamingContext(master, framework, batchDuration)
+val originalConf = ssc.conf
+assert(originalConf.get(spark.driver.host) === localhost)
+assert(originalConf.get(spark.driver.port) === )
+
+val cp = new Checkpoint(ssc, Time(1000))
+ssc.stop()
+
+// Serialize/deserialize to simulate write to storage and reading it back
+val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
+
+val newCpConf = newCp.createSparkConf()
+assert(newCpConf.contains(spark.driver.host))
+assert(newCpConf.contains(spark.driver.port))
+assert(newCpConf.get(spark.driver.host) === localhost)
+assert(newCpConf.get(spark.driver.port) === )
+
+// Check if all the parameters have been restored
+ssc = new StreamingContext(null, newCp, null)
+val restoredConf = ssc.conf
+assert(restoredConf.get(spark.driver.host) === localhost)
+assert(restoredConf.get(spark.driver.port) === )
+ssc.stop()
+
+// If spark.driver.host and spark.driver.host is not set in system 
property, these two
+// parameters should not be presented in the newly recovered conf.
+conf.foreach(kv = System.clearProperty(kv._1))
+val newCpConf1 = newCp.createSparkConf()
+assert(!newCpConf1.contains(spark.driver.host))
+assert(!newCpConf1.contains(spark.driver.port))
+
+// Spark itself will dispatch a random, not-used port for 
spark.driver.port if it is not set
+// explicitly.
+ssc = new StreamingContext(null, newCp, null)
+val restoredConf1 = ssc.conf
+assert(restoredConf1.get(spark.driver.host) === 

spark git commit: [SPARK-8644] Include call site in SparkException stack traces thrown by job failures

2015-07-16 Thread adav
Repository: spark
Updated Branches:
  refs/heads/master 031d7d414 - 57e9b13bf


[SPARK-8644] Include call site in SparkException stack traces thrown by job 
failures

Example exception (new part at bottom, clearly demarcated):

```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 
0, localhost): java.lang.RuntimeException: uh-oh!
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38$$anonfun$apply$mcJ$sp$2.apply(DAGSchedulerSuite.scala:880)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38$$anonfun$apply$mcJ$sp$2.apply(DAGSchedulerSuite.scala:880)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1640)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1777)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1777)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1298)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1289)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1288)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1288)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:755)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:755)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:755)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1509)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1470)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1459)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:560)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1744)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1762)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1791)
at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38.apply$mcJ$sp(DAGSchedulerSuite.scala:880)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38.apply(DAGSchedulerSuite.scala:880)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38.apply(DAGSchedulerSuite.scala:880)
at org.scalatest.Assertions$class.intercept(Assertions.scala:997)
at org.scalatest.FunSuite.intercept(FunSuite.scala:1555)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37.apply$mcV$sp(DAGSchedulerSuite.scala:879)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37.apply(DAGSchedulerSuite.scala:878)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37.apply(DAGSchedulerSuite.scala:878)
at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42)
at 
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at