spark git commit: [SPARK-12198][SPARKR] SparkR support read.parquet and deprecate parquetFile

2015-12-10 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master db5165246 -> eeb58722a


[SPARK-12198][SPARKR] SparkR support read.parquet and deprecate parquetFile

SparkR support ```read.parquet``` and deprecate ```parquetFile```. This change 
is similar with #10145 for ```jsonFile```.

Author: Yanbo Liang 

Closes #10191 from yanboliang/spark-12198.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eeb58722
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eeb58722
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eeb58722

Branch: refs/heads/master
Commit: eeb58722ad73441eeb5f35f864be3c5392cfd426
Parents: db51652
Author: Yanbo Liang 
Authored: Thu Dec 10 09:44:53 2015 -0800
Committer: Shivaram Venkataraman 
Committed: Thu Dec 10 09:44:53 2015 -0800

--
 R/pkg/NAMESPACE   |  1 +
 R/pkg/R/SQLContext.R  | 16 ++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 +++
 3 files changed, 22 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eeb58722/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 565a2b1..ba64bc5 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -270,6 +270,7 @@ export("as.DataFrame",
"loadDF",
"parquetFile",
"read.df",
+   "read.parquet",
"sql",
"table",
"tableNames",

http://git-wip-us.apache.org/repos/asf/spark/blob/eeb58722/R/pkg/R/SQLContext.R
--
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 85541c8..f678c70 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -256,18 +256,30 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, 
samplingRatio = 1.0) {
   }
 }
 
-
 #' Create a DataFrame from a Parquet file.
 #'
 #' Loads a Parquet file, returning the result as a DataFrame.
 #'
 #' @param sqlContext SQLContext to use
-#' @param ... Path(s) of parquet file(s) to read.
+#' @param path Path of file to read. A vector of multiple paths is allowed.
 #' @return DataFrame
+#' @rdname read.parquet
+#' @name read.parquet
 #' @export
+read.parquet <- function(sqlContext, path) {
+  # Allow the user to have a more flexible definiton of the text file path
+  paths <- as.list(suppressWarnings(normalizePath(path)))
+  read <- callJMethod(sqlContext, "read")
+  sdf <- callJMethod(read, "parquet", paths)
+  dataFrame(sdf)
+}
 
+#' @rdname read.parquet
+#' @name parquetFile
+#' @export
 # TODO: Implement saveasParquetFile and write examples for both
 parquetFile <- function(sqlContext, ...) {
+  .Deprecated("read.parquet")
   # Allow the user to have a more flexible definiton of the text file path
   paths <- lapply(list(...), function(x) suppressWarnings(normalizePath(x)))
   sdf <- callJMethod(sqlContext, "parquetFile", paths)

http://git-wip-us.apache.org/repos/asf/spark/blob/eeb58722/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 39fc94a..222c04a 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1420,22 +1420,25 @@ test_that("mutate(), transform(), rename() and 
names()", {
   detach(airquality)
 })
 
-test_that("write.df() on DataFrame and works with parquetFile", {
+test_that("write.df() on DataFrame and works with read.parquet", {
   df <- jsonFile(sqlContext, jsonPath)
   write.df(df, parquetPath, "parquet", mode="overwrite")
-  parquetDF <- parquetFile(sqlContext, parquetPath)
+  parquetDF <- read.parquet(sqlContext, parquetPath)
   expect_is(parquetDF, "DataFrame")
   expect_equal(count(df), count(parquetDF))
 })
 
-test_that("parquetFile works with multiple input paths", {
+test_that("read.parquet()/parquetFile() works with multiple input paths", {
   df <- jsonFile(sqlContext, jsonPath)
   write.df(df, parquetPath, "parquet", mode="overwrite")
   parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
   write.df(df, parquetPath2, "parquet", mode="overwrite")
-  parquetDF <- parquetFile(sqlContext, parquetPath, parquetPath2)
+  parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2))
   expect_is(parquetDF, "DataFrame")
   expect_equal(count(parquetDF), count(df) * 2)
+  parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, 
parquetPath2))
+  expect_is(parquetDF2, "DataFrame")
+  expect_equal(count(parquetDF2), count(df) * 2)
 
   # Test if varargs works with variables
   saveMode <- "overwrite"



spark git commit: [SPARK-11602][MLLIB] Refine visibility for 1.6 scala API audit

2015-12-10 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master eeb58722a -> 9fba9c800


[SPARK-11602][MLLIB] Refine visibility for 1.6 scala API audit

jira: https://issues.apache.org/jira/browse/SPARK-11602

Made a pass on the API change of 1.6. Open the PR for efficient discussion.

Author: Yuhao Yang 

Closes #9939 from hhbyyh/auditScala.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fba9c80
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fba9c80
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fba9c80

Branch: refs/heads/master
Commit: 9fba9c8004d2b97549e5456fa7918965bec27336
Parents: eeb5872
Author: Yuhao Yang 
Authored: Thu Dec 10 10:15:50 2015 -0800
Committer: Joseph K. Bradley 
Committed: Thu Dec 10 10:15:50 2015 -0800

--
 .../src/main/scala/org/apache/spark/ml/feature/package-info.java | 2 +-
 .../scala/org/apache/spark/ml/regression/LinearRegression.scala  | 2 +-
 .../org/apache/spark/mllib/clustering/BisectingKMeans.scala  | 2 +-
 .../org/apache/spark/mllib/clustering/BisectingKMeansModel.scala | 4 ++--
 4 files changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9fba9c80/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java 
b/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java
index c22d2e0..7a35f2d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java
@@ -23,7 +23,7 @@
  * features into more suitable forms for model fitting.
  * Most feature transformers are implemented as {@link 
org.apache.spark.ml.Transformer}s, which
  * transforms one {@link org.apache.spark.sql.DataFrame} into another, e.g.,
- * {@link org.apache.spark.feature.HashingTF}.
+ * {@link org.apache.spark.ml.feature.HashingTF}.
  * Some feature transformers are implemented as {@link 
org.apache.spark.ml.Estimator}}s, because the
  * transformation requires some aggregated information of the dataset, e.g., 
document
  * frequencies in {@link org.apache.spark.ml.feature.IDF}.

http://git-wip-us.apache.org/repos/asf/spark/blob/9fba9c80/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala 
b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 1db9166..5e58509 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -529,7 +529,7 @@ class LinearRegressionSummary private[regression] (
 val predictionCol: String,
 val labelCol: String,
 val model: LinearRegressionModel,
-val diagInvAtWA: Array[Double]) extends Serializable {
+private val diagInvAtWA: Array[Double]) extends Serializable {
 
   @transient private val metrics = new RegressionMetrics(
 predictions

http://git-wip-us.apache.org/repos/asf/spark/blob/9fba9c80/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
index 82adfa6..54bf510 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
@@ -407,7 +407,7 @@ private object BisectingKMeans extends Serializable {
  */
 @Since("1.6.0")
 @Experimental
-class ClusteringTreeNode private[clustering] (
+private[clustering] class ClusteringTreeNode private[clustering] (
 val index: Int,
 val size: Long,
 private val centerWithNorm: VectorWithNorm,

http://git-wip-us.apache.org/repos/asf/spark/blob/9fba9c80/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
index f942e56..9ccf96b 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
@@ -32,8 +32,8 @@ import org.apache.spark.rdd.RDD
  */
 

spark git commit: [SPARK-12250][SQL] Allow users to define a UDAF without providing details of its inputSchema

2015-12-10 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master d9d354ed4 -> bc5f56aa6


[SPARK-12250][SQL] Allow users to define a UDAF without providing details of 
its inputSchema

https://issues.apache.org/jira/browse/SPARK-12250

Author: Yin Huai 

Closes #10236 from yhuai/SPARK-12250.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc5f56aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc5f56aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc5f56aa

Branch: refs/heads/master
Commit: bc5f56aa60a430244ffa0cacd81c0b1ecbf8d68f
Parents: d9d354e
Author: Yin Huai 
Authored: Thu Dec 10 12:03:29 2015 -0800
Committer: Yin Huai 
Committed: Thu Dec 10 12:03:29 2015 -0800

--
 .../spark/sql/execution/aggregate/udaf.scala|  5 --
 .../hive/execution/AggregationQuerySuite.scala  | 64 
 2 files changed, 64 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bc5f56aa/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 20359c1..c0d0010 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -332,11 +332,6 @@ private[sql] case class ScalaUDAF(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 
-  require(
-children.length == udaf.inputSchema.length,
-s"$udaf only accepts ${udaf.inputSchema.length} arguments, " +
-  s"but ${children.length} are provided.")
-
   override def nullable: Boolean = true
 
   override def dataType: DataType = udaf.dataType

http://git-wip-us.apache.org/repos/asf/spark/blob/bc5f56aa/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 39c0a2a..064c000 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -66,6 +66,33 @@ class ScalaAggregateFunction(schema: StructType) extends 
UserDefinedAggregateFun
   }
 }
 
+class ScalaAggregateFunctionWithoutInputSchema extends 
UserDefinedAggregateFunction {
+
+  def inputSchema: StructType = StructType(Nil)
+
+  def bufferSchema: StructType = StructType(StructField("value", LongType) :: 
Nil)
+
+  def dataType: DataType = LongType
+
+  def deterministic: Boolean = true
+
+  def initialize(buffer: MutableAggregationBuffer): Unit = {
+buffer.update(0, 0L)
+  }
+
+  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+buffer.update(0, input.getAs[Seq[Row]](0).map(_.getAs[Int]("v")).sum + 
buffer.getLong(0))
+  }
+
+  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
+  }
+
+  def evaluate(buffer: Row): Any = {
+buffer.getLong(0)
+  }
+}
+
 class LongProductSum extends UserDefinedAggregateFunction {
   def inputSchema: StructType = new StructType()
 .add("a", LongType)
@@ -858,6 +885,43 @@ abstract class AggregationQuerySuite extends QueryTest 
with SQLTestUtils with Te
   )
 }
   }
+
+  test("udaf without specifying inputSchema") {
+withTempTable("noInputSchemaUDAF") {
+  sqlContext.udf.register("noInputSchema", new 
ScalaAggregateFunctionWithoutInputSchema)
+
+  val data =
+Row(1, Seq(Row(1), Row(2), Row(3))) ::
+  Row(1, Seq(Row(4), Row(5), Row(6))) ::
+  Row(2, Seq(Row(-10))) :: Nil
+  val schema =
+StructType(
+  StructField("key", IntegerType) ::
+StructField("myArray",
+  ArrayType(StructType(StructField("v", IntegerType) :: Nil))) :: 
Nil)
+  sqlContext.createDataFrame(
+sparkContext.parallelize(data, 2),
+schema)
+.registerTempTable("noInputSchemaUDAF")
+
+  checkAnswer(
+sqlContext.sql(
+  """
+|SELECT key, noInputSchema(myArray)
+|FROM noInputSchemaUDAF
+|GROUP BY key
+  """.stripMargin),
+Row(1, 21) :: Row(2, -10) :: Nil)
+
+  checkAnswer(
+sqlContext.sql(
+  

spark git commit: [SPARK-12250][SQL] Allow users to define a UDAF without providing details of its inputSchema

2015-12-10 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e541f703d -> 594fafc61


[SPARK-12250][SQL] Allow users to define a UDAF without providing details of 
its inputSchema

https://issues.apache.org/jira/browse/SPARK-12250

Author: Yin Huai 

Closes #10236 from yhuai/SPARK-12250.

(cherry picked from commit bc5f56aa60a430244ffa0cacd81c0b1ecbf8d68f)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/594fafc6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/594fafc6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/594fafc6

Branch: refs/heads/branch-1.6
Commit: 594fafc6122ad9c6b24bdb4a434d97158c7745f3
Parents: e541f70
Author: Yin Huai 
Authored: Thu Dec 10 12:03:29 2015 -0800
Committer: Yin Huai 
Committed: Thu Dec 10 12:03:40 2015 -0800

--
 .../spark/sql/execution/aggregate/udaf.scala|  5 --
 .../hive/execution/AggregationQuerySuite.scala  | 64 
 2 files changed, 64 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/594fafc6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 20359c1..c0d0010 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -332,11 +332,6 @@ private[sql] case class ScalaUDAF(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
 copy(inputAggBufferOffset = newInputAggBufferOffset)
 
-  require(
-children.length == udaf.inputSchema.length,
-s"$udaf only accepts ${udaf.inputSchema.length} arguments, " +
-  s"but ${children.length} are provided.")
-
   override def nullable: Boolean = true
 
   override def dataType: DataType = udaf.dataType

http://git-wip-us.apache.org/repos/asf/spark/blob/594fafc6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 39c0a2a..064c000 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -66,6 +66,33 @@ class ScalaAggregateFunction(schema: StructType) extends 
UserDefinedAggregateFun
   }
 }
 
+class ScalaAggregateFunctionWithoutInputSchema extends 
UserDefinedAggregateFunction {
+
+  def inputSchema: StructType = StructType(Nil)
+
+  def bufferSchema: StructType = StructType(StructField("value", LongType) :: 
Nil)
+
+  def dataType: DataType = LongType
+
+  def deterministic: Boolean = true
+
+  def initialize(buffer: MutableAggregationBuffer): Unit = {
+buffer.update(0, 0L)
+  }
+
+  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+buffer.update(0, input.getAs[Seq[Row]](0).map(_.getAs[Int]("v")).sum + 
buffer.getLong(0))
+  }
+
+  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
+  }
+
+  def evaluate(buffer: Row): Any = {
+buffer.getLong(0)
+  }
+}
+
 class LongProductSum extends UserDefinedAggregateFunction {
   def inputSchema: StructType = new StructType()
 .add("a", LongType)
@@ -858,6 +885,43 @@ abstract class AggregationQuerySuite extends QueryTest 
with SQLTestUtils with Te
   )
 }
   }
+
+  test("udaf without specifying inputSchema") {
+withTempTable("noInputSchemaUDAF") {
+  sqlContext.udf.register("noInputSchema", new 
ScalaAggregateFunctionWithoutInputSchema)
+
+  val data =
+Row(1, Seq(Row(1), Row(2), Row(3))) ::
+  Row(1, Seq(Row(4), Row(5), Row(6))) ::
+  Row(2, Seq(Row(-10))) :: Nil
+  val schema =
+StructType(
+  StructField("key", IntegerType) ::
+StructField("myArray",
+  ArrayType(StructType(StructField("v", IntegerType) :: Nil))) :: 
Nil)
+  sqlContext.createDataFrame(
+sparkContext.parallelize(data, 2),
+schema)
+.registerTempTable("noInputSchemaUDAF")
+
+  checkAnswer(
+sqlContext.sql(
+  """
+|SELECT key, noInputSchema(myArray)
+|FROM noInputSchemaUDAF
+|GROUP BY 

spark git commit: [SPARK-12228][SQL] Try to run execution hive's derby in memory.

2015-12-10 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master bc5f56aa6 -> ec5f9ed5d


[SPARK-12228][SQL] Try to run execution hive's derby in memory.

This PR tries to make execution hive's derby run in memory since it is a fake 
metastore and every time we create a HiveContext, we will switch to a new one. 
It is possible that it can reduce the flakyness of our tests that need to 
create HiveContext (e.g. HiveSparkSubmitSuite). I will test it more.

https://issues.apache.org/jira/browse/SPARK-12228

Author: Yin Huai 

Closes #10204 from yhuai/derbyInMemory.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec5f9ed5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec5f9ed5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec5f9ed5

Branch: refs/heads/master
Commit: ec5f9ed5de2218938dba52152475daafd4dc4786
Parents: bc5f56a
Author: Yin Huai 
Authored: Thu Dec 10 12:04:20 2015 -0800
Committer: Yin Huai 
Committed: Thu Dec 10 12:04:20 2015 -0800

--
 .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala  | 2 +-
 .../main/scala/org/apache/spark/sql/hive/HiveContext.scala   | 8 +---
 .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 +-
 .../org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala | 2 ++
 4 files changed, 9 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ec5f9ed5/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 4b928e6..03bb2c2 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -83,7 +83,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
 
 val cliConf = new HiveConf(classOf[SessionState])
 // Override the location of the metastore since this is only used for 
local execution.
-HiveContext.newTemporaryConfiguration().foreach {
+HiveContext.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
   case (key, value) => cliConf.set(key, value)
 }
 val sessionState = new CliSessionState(cliConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/ec5f9ed5/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index e83941c..5958777 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -212,7 +212,7 @@ class HiveContext private[hive](
 val loader = new IsolatedClientLoader(
   version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
   execJars = Seq(),
-  config = newTemporaryConfiguration(),
+  config = newTemporaryConfiguration(useInMemoryDerby = true),
   isolationOn = false,
   baseClassLoader = Utils.getContextOrSparkClassLoader)
 loader.createClient().asInstanceOf[ClientWrapper]
@@ -721,7 +721,9 @@ private[hive] object HiveContext {
 doc = "TODO")
 
   /** Constructs a configuration for hive, where the metastore is located in a 
temp directory. */
-  def newTemporaryConfiguration(): Map[String, String] = {
+  def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, 
String] = {
+val withInMemoryMode = if (useInMemoryDerby) "memory:" else ""
+
 val tempDir = Utils.createTempDir()
 val localMetastore = new File(tempDir, "metastore")
 val propMap: HashMap[String, String] = HashMap()
@@ -735,7 +737,7 @@ private[hive] object HiveContext {
 }
 propMap.put(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, 
localMetastore.toURI.toString)
 propMap.put(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
-  
s"jdbc:derby:;databaseName=${localMetastore.getAbsolutePath};create=true")
+  
s"jdbc:derby:${withInMemoryMode};databaseName=${localMetastore.getAbsolutePath};create=true")
 propMap.put("datanucleus.rdbms.datastoreAdapterClassName",
   "org.datanucleus.store.rdbms.adapter.DerbyAdapter")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ec5f9ed5/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

[1/2] spark git commit: [SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib and mllib in the documentation.

2015-12-10 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master ec5f9ed5d -> 2ecbe02d5


http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/mllib-clustering.md
--
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 8fbced6..48d64cd 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -1,7 +1,7 @@
 ---
 layout: global
-title: Clustering - MLlib
-displayTitle: MLlib - Clustering
+title: Clustering - spark.mllib
+displayTitle: Clustering - spark.mllib
 ---
 
 [Clustering](https://en.wikipedia.org/wiki/Cluster_analysis) is an 
unsupervised learning problem whereby we aim to group subsets
@@ -10,19 +10,19 @@ often used for exploratory analysis and/or as a component 
of a hierarchical
 [supervised learning](https://en.wikipedia.org/wiki/Supervised_learning) 
pipeline (in which distinct classifiers or regression
 models are trained for each cluster).
 
-MLlib supports the following models:
+The `spark.mllib` package supports the following models:
 
 * Table of contents
 {:toc}
 
 ## K-means
 
-[k-means](http://en.wikipedia.org/wiki/K-means_clustering) is one of the
+[K-means](http://en.wikipedia.org/wiki/K-means_clustering) is one of the
 most commonly used clustering algorithms that clusters the data points into a
-predefined number of clusters. The MLlib implementation includes a parallelized
+predefined number of clusters. The `spark.mllib` implementation includes a 
parallelized
 variant of the [k-means++](http://en.wikipedia.org/wiki/K-means%2B%2B) method
 called [kmeans||](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf).
-The implementation in MLlib has the following parameters:
+The implementation in `spark.mllib` has the following parameters:
 
 * *k* is the number of desired clusters.
 * *maxIterations* is the maximum number of iterations to run.
@@ -171,7 +171,7 @@ sameModel = KMeansModel.load(sc, "myModelPath")
 
 A [Gaussian Mixture 
Model](http://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model)
 represents a composite distribution whereby points are drawn from one of *k* 
Gaussian sub-distributions,
-each with its own probability.  The MLlib implementation uses the
+each with its own probability.  The `spark.mllib` implementation uses the
 
[expectation-maximization](http://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm)
  algorithm to induce the maximum-likelihood model given a set of samples.  The 
implementation
 has the following parameters:
@@ -308,13 +308,13 @@ graph given pairwise similarties as edge properties,
 described in [Lin and Cohen, Power Iteration 
Clustering](http://www.icml2010.org/papers/387.pdf).
 It computes a pseudo-eigenvector of the normalized affinity matrix of the 
graph via
 [power iteration](http://en.wikipedia.org/wiki/Power_iteration)  and uses it 
to cluster vertices.
-MLlib includes an implementation of PIC using GraphX as its backend.
+`spark.mllib` includes an implementation of PIC using GraphX as its backend.
 It takes an `RDD` of `(srcId, dstId, similarity)` tuples and outputs a model 
with the clustering assignments.
 The similarities must be nonnegative.
 PIC assumes that the similarity measure is symmetric.
 A pair `(srcId, dstId)` regardless of the ordering should appear at most once 
in the input data.
 If a pair is missing from input, their similarity is treated as zero.
-MLlib's PIC implementation takes the following (hyper-)parameters:
+`spark.mllib`'s PIC implementation takes the following (hyper-)parameters:
 
 * `k`: number of clusters
 * `maxIterations`: maximum number of power iterations
@@ -323,7 +323,7 @@ MLlib's PIC implementation takes the following 
(hyper-)parameters:
 
 **Examples**
 
-In the following, we show code snippets to demonstrate how to use PIC in MLlib.
+In the following, we show code snippets to demonstrate how to use PIC in 
`spark.mllib`.
 
 
 
@@ -493,7 +493,7 @@ checkpointing can help reduce shuffle file sizes on disk 
and help with
 failure recovery.
 
 
-All of MLlib's LDA models support:
+All of `spark.mllib`'s LDA models support:
 
 * `describeTopics`: Returns topics as arrays of most important terms and
 term weights
@@ -721,7 +721,7 @@ sameModel = LDAModel.load(sc, "myModelPath")
 ## Streaming k-means
 
 When data arrive in a stream, we may want to estimate clusters dynamically,
-updating them as new data arrive. MLlib provides support for streaming k-means 
clustering,
+updating them as new data arrive. `spark.mllib` provides support for streaming 
k-means clustering,
 with parameters to control the decay (or "forgetfulness") of the estimates. 
The algorithm
 uses a generalization of the mini-batch k-means update rule. For each batch of 
data, we assign
 all points to their nearest cluster, compute new cluster centers, then update 
each cluster using:

http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/mllib-collaborative-filtering.md

[2/2] spark git commit: [SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib and mllib in the documentation.

2015-12-10 Thread jkbradley
[SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib 
and mllib in the documentation.

Replaces a number of occurences of `MLlib` in the documentation that were meant 
to refer to the `spark.mllib` package instead. It should clarify for new users 
the difference between `spark.mllib` (the package) and MLlib (the umbrella 
project for ML in spark).

It also removes some files that I forgot to delete with #10207

Author: Timothy Hunter 

Closes #10234 from thunterdb/12212.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ecbe02d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ecbe02d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ecbe02d

Branch: refs/heads/master
Commit: 2ecbe02d5b28ee562d10c1735244b90a08532c9e
Parents: ec5f9ed
Author: Timothy Hunter 
Authored: Thu Dec 10 12:50:46 2015 -0800
Committer: Joseph K. Bradley 
Committed: Thu Dec 10 12:50:46 2015 -0800

--
 docs/_data/menu-ml.yaml |   2 +-
 docs/_includes/nav-left-wrapper-ml.html |   4 +-
 docs/ml-advanced.md |   2 +-
 docs/ml-ann.md  |   8 +
 docs/ml-classification-regression.md|  25 +-
 docs/ml-clustering.md   |   4 +-
 docs/ml-decision-tree.md| 171 +
 docs/ml-ensembles.md| 319 +
 docs/ml-features.md |   4 +-
 docs/ml-guide.md|  19 +-
 docs/ml-intro.md| 941 ---
 docs/ml-linear-methods.md   | 148 +
 docs/ml-survival-regression.md  |  96 +--
 docs/mllib-classification-regression.md |   6 +-
 docs/mllib-clustering.md|  24 +-
 docs/mllib-collaborative-filtering.md   |  14 +-
 docs/mllib-data-types.md|   2 +-
 docs/mllib-decision-tree.md |   6 +-
 docs/mllib-dimensionality-reduction.md  |  10 +-
 docs/mllib-ensembles.md |  16 +-
 docs/mllib-evaluation-metrics.md|  10 +-
 docs/mllib-feature-extraction.md|  12 +-
 docs/mllib-frequent-pattern-mining.md   |  12 +-
 docs/mllib-guide.md |   2 +-
 docs/mllib-isotonic-regression.md   |   6 +-
 docs/mllib-linear-methods.md|  31 +-
 docs/mllib-migration-guides.md  |   4 +-
 docs/mllib-naive-bayes.md   |   6 +-
 docs/mllib-optimization.md  |   8 +-
 docs/mllib-pmml-model-export.md |  12 +-
 docs/mllib-statistics.md|  18 +-
 31 files changed, 149 insertions(+), 1793 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/_data/menu-ml.yaml
--
diff --git a/docs/_data/menu-ml.yaml b/docs/_data/menu-ml.yaml
index fe37d05..2eea9a9 100644
--- a/docs/_data/menu-ml.yaml
+++ b/docs/_data/menu-ml.yaml
@@ -1,5 +1,5 @@
 - text: "Overview: estimators, transformers and pipelines"
-  url: ml-intro.html
+  url: ml-guide.html
 - text: Extracting, transforming and selecting features
   url: ml-features.html
 - text: Classification and Regression

http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/_includes/nav-left-wrapper-ml.html
--
diff --git a/docs/_includes/nav-left-wrapper-ml.html 
b/docs/_includes/nav-left-wrapper-ml.html
index 0103e89..e2d7eda 100644
--- a/docs/_includes/nav-left-wrapper-ml.html
+++ b/docs/_includes/nav-left-wrapper-ml.html
@@ -1,8 +1,8 @@
 
 
-spark.ml package
+spark.ml package
 {% include nav-left.html nav=include.nav-ml %}
-spark.mllib package
+spark.mllib package
 {% include nav-left.html nav=include.nav-mllib %}
 
 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/ml-advanced.md
--
diff --git a/docs/ml-advanced.md b/docs/ml-advanced.md
index b005633..91731d7 100644
--- a/docs/ml-advanced.md
+++ b/docs/ml-advanced.md
@@ -1,7 +1,7 @@
 ---
 layout: global
 title: Advanced topics - spark.ml
-displayTitle: Advanced topics
+displayTitle: Advanced topics - spark.ml
 ---
 
 # Optimization of linear methods

http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/ml-ann.md
--
diff --git a/docs/ml-ann.md b/docs/ml-ann.md
new file mode 100644
index 000..c2d9bd2
--- /dev/null
+++ b/docs/ml-ann.md
@@ -0,0 +1,8 @@
+---
+layout: global
+title: Multilayer perceptron classifier - spark.ml
+displayTitle: Multilayer perceptron classifier - spark.ml
+---
+
+  > This section has 

[2/2] spark git commit: [SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib and mllib in the documentation.

2015-12-10 Thread jkbradley
[SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib 
and mllib in the documentation.

Replaces a number of occurences of `MLlib` in the documentation that were meant 
to refer to the `spark.mllib` package instead. It should clarify for new users 
the difference between `spark.mllib` (the package) and MLlib (the umbrella 
project for ML in spark).

It also removes some files that I forgot to delete with #10207

Author: Timothy Hunter 

Closes #10234 from thunterdb/12212.

(cherry picked from commit 2ecbe02d5b28ee562d10c1735244b90a08532c9e)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0307dea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0307dea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0307dea

Branch: refs/heads/branch-1.6
Commit: d0307deaa29d5fcf1c675f9367c26aa6a3db3fba
Parents: 594fafc
Author: Timothy Hunter 
Authored: Thu Dec 10 12:50:46 2015 -0800
Committer: Joseph K. Bradley 
Committed: Thu Dec 10 12:51:00 2015 -0800

--
 docs/_data/menu-ml.yaml |   2 +-
 docs/_includes/nav-left-wrapper-ml.html |   4 +-
 docs/ml-advanced.md |   2 +-
 docs/ml-ann.md  |   8 +
 docs/ml-classification-regression.md|  25 +-
 docs/ml-clustering.md   |   4 +-
 docs/ml-decision-tree.md| 171 +
 docs/ml-ensembles.md| 319 +
 docs/ml-features.md |   4 +-
 docs/ml-guide.md|  19 +-
 docs/ml-intro.md| 941 ---
 docs/ml-linear-methods.md   | 148 +
 docs/ml-survival-regression.md  |  96 +--
 docs/mllib-classification-regression.md |   6 +-
 docs/mllib-clustering.md|  24 +-
 docs/mllib-collaborative-filtering.md   |  14 +-
 docs/mllib-data-types.md|   2 +-
 docs/mllib-decision-tree.md |   6 +-
 docs/mllib-dimensionality-reduction.md  |  10 +-
 docs/mllib-ensembles.md |  16 +-
 docs/mllib-evaluation-metrics.md|  10 +-
 docs/mllib-feature-extraction.md|  12 +-
 docs/mllib-frequent-pattern-mining.md   |  12 +-
 docs/mllib-guide.md |   2 +-
 docs/mllib-isotonic-regression.md   |   6 +-
 docs/mllib-linear-methods.md|  31 +-
 docs/mllib-migration-guides.md  |   4 +-
 docs/mllib-naive-bayes.md   |   6 +-
 docs/mllib-optimization.md  |   8 +-
 docs/mllib-pmml-model-export.md |  12 +-
 docs/mllib-statistics.md|  18 +-
 31 files changed, 149 insertions(+), 1793 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/_data/menu-ml.yaml
--
diff --git a/docs/_data/menu-ml.yaml b/docs/_data/menu-ml.yaml
index fe37d05..2eea9a9 100644
--- a/docs/_data/menu-ml.yaml
+++ b/docs/_data/menu-ml.yaml
@@ -1,5 +1,5 @@
 - text: "Overview: estimators, transformers and pipelines"
-  url: ml-intro.html
+  url: ml-guide.html
 - text: Extracting, transforming and selecting features
   url: ml-features.html
 - text: Classification and Regression

http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/_includes/nav-left-wrapper-ml.html
--
diff --git a/docs/_includes/nav-left-wrapper-ml.html 
b/docs/_includes/nav-left-wrapper-ml.html
index 0103e89..e2d7eda 100644
--- a/docs/_includes/nav-left-wrapper-ml.html
+++ b/docs/_includes/nav-left-wrapper-ml.html
@@ -1,8 +1,8 @@
 
 
-spark.ml package
+spark.ml package
 {% include nav-left.html nav=include.nav-ml %}
-spark.mllib package
+spark.mllib package
 {% include nav-left.html nav=include.nav-mllib %}
 
 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/ml-advanced.md
--
diff --git a/docs/ml-advanced.md b/docs/ml-advanced.md
index b005633..91731d7 100644
--- a/docs/ml-advanced.md
+++ b/docs/ml-advanced.md
@@ -1,7 +1,7 @@
 ---
 layout: global
 title: Advanced topics - spark.ml
-displayTitle: Advanced topics
+displayTitle: Advanced topics - spark.ml
 ---
 
 # Optimization of linear methods

http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/ml-ann.md
--
diff --git a/docs/ml-ann.md b/docs/ml-ann.md
new file mode 100644
index 000..c2d9bd2
--- /dev/null
+++ b/docs/ml-ann.md
@@ -0,0 +1,8 @@
+---
+layout: global
+title: 

[1/2] spark git commit: [SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib and mllib in the documentation.

2015-12-10 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 594fafc61 -> d0307deaa


http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/mllib-clustering.md
--
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 8fbced6..48d64cd 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -1,7 +1,7 @@
 ---
 layout: global
-title: Clustering - MLlib
-displayTitle: MLlib - Clustering
+title: Clustering - spark.mllib
+displayTitle: Clustering - spark.mllib
 ---
 
 [Clustering](https://en.wikipedia.org/wiki/Cluster_analysis) is an 
unsupervised learning problem whereby we aim to group subsets
@@ -10,19 +10,19 @@ often used for exploratory analysis and/or as a component 
of a hierarchical
 [supervised learning](https://en.wikipedia.org/wiki/Supervised_learning) 
pipeline (in which distinct classifiers or regression
 models are trained for each cluster).
 
-MLlib supports the following models:
+The `spark.mllib` package supports the following models:
 
 * Table of contents
 {:toc}
 
 ## K-means
 
-[k-means](http://en.wikipedia.org/wiki/K-means_clustering) is one of the
+[K-means](http://en.wikipedia.org/wiki/K-means_clustering) is one of the
 most commonly used clustering algorithms that clusters the data points into a
-predefined number of clusters. The MLlib implementation includes a parallelized
+predefined number of clusters. The `spark.mllib` implementation includes a 
parallelized
 variant of the [k-means++](http://en.wikipedia.org/wiki/K-means%2B%2B) method
 called [kmeans||](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf).
-The implementation in MLlib has the following parameters:
+The implementation in `spark.mllib` has the following parameters:
 
 * *k* is the number of desired clusters.
 * *maxIterations* is the maximum number of iterations to run.
@@ -171,7 +171,7 @@ sameModel = KMeansModel.load(sc, "myModelPath")
 
 A [Gaussian Mixture 
Model](http://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model)
 represents a composite distribution whereby points are drawn from one of *k* 
Gaussian sub-distributions,
-each with its own probability.  The MLlib implementation uses the
+each with its own probability.  The `spark.mllib` implementation uses the
 
[expectation-maximization](http://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm)
  algorithm to induce the maximum-likelihood model given a set of samples.  The 
implementation
 has the following parameters:
@@ -308,13 +308,13 @@ graph given pairwise similarties as edge properties,
 described in [Lin and Cohen, Power Iteration 
Clustering](http://www.icml2010.org/papers/387.pdf).
 It computes a pseudo-eigenvector of the normalized affinity matrix of the 
graph via
 [power iteration](http://en.wikipedia.org/wiki/Power_iteration)  and uses it 
to cluster vertices.
-MLlib includes an implementation of PIC using GraphX as its backend.
+`spark.mllib` includes an implementation of PIC using GraphX as its backend.
 It takes an `RDD` of `(srcId, dstId, similarity)` tuples and outputs a model 
with the clustering assignments.
 The similarities must be nonnegative.
 PIC assumes that the similarity measure is symmetric.
 A pair `(srcId, dstId)` regardless of the ordering should appear at most once 
in the input data.
 If a pair is missing from input, their similarity is treated as zero.
-MLlib's PIC implementation takes the following (hyper-)parameters:
+`spark.mllib`'s PIC implementation takes the following (hyper-)parameters:
 
 * `k`: number of clusters
 * `maxIterations`: maximum number of power iterations
@@ -323,7 +323,7 @@ MLlib's PIC implementation takes the following 
(hyper-)parameters:
 
 **Examples**
 
-In the following, we show code snippets to demonstrate how to use PIC in MLlib.
+In the following, we show code snippets to demonstrate how to use PIC in 
`spark.mllib`.
 
 
 
@@ -493,7 +493,7 @@ checkpointing can help reduce shuffle file sizes on disk 
and help with
 failure recovery.
 
 
-All of MLlib's LDA models support:
+All of `spark.mllib`'s LDA models support:
 
 * `describeTopics`: Returns topics as arrays of most important terms and
 term weights
@@ -721,7 +721,7 @@ sameModel = LDAModel.load(sc, "myModelPath")
 ## Streaming k-means
 
 When data arrive in a stream, we may want to estimate clusters dynamically,
-updating them as new data arrive. MLlib provides support for streaming k-means 
clustering,
+updating them as new data arrive. `spark.mllib` provides support for streaming 
k-means clustering,
 with parameters to control the decay (or "forgetfulness") of the estimates. 
The algorithm
 uses a generalization of the mini-batch k-means update rule. For each batch of 
data, we assign
 all points to their nearest cluster, compute new cluster centers, then update 
each cluster using:

http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/mllib-collaborative-filtering.md

spark git commit: [SPARK-11563][CORE][REPL] Use RpcEnv to transfer REPL-generated classes.

2015-12-10 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 2ecbe02d5 -> 4a46b8859


[SPARK-11563][CORE][REPL] Use RpcEnv to transfer REPL-generated classes.

This avoids bringing up yet another HTTP server on the driver, and
instead reuses the file server already managed by the driver's
RpcEnv. As a bonus, the repl now inherits the security features of
the network library.

There's also a small change to create the directory for storing classes
under the root temp dir for the application (instead of directly
under java.io.tmpdir).

Author: Marcelo Vanzin 

Closes #9923 from vanzin/SPARK-11563.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a46b885
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a46b885
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a46b885

Branch: refs/heads/master
Commit: 4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c
Parents: 2ecbe02
Author: Marcelo Vanzin 
Authored: Thu Dec 10 13:26:30 2015 -0800
Committer: Marcelo Vanzin 
Committed: Thu Dec 10 13:26:30 2015 -0800

--
 .../scala/org/apache/spark/HttpFileServer.scala |  5 ++
 .../scala/org/apache/spark/HttpServer.scala | 26 +++
 .../scala/org/apache/spark/SparkContext.scala   |  6 +++
 .../org/apache/spark/executor/Executor.scala|  6 +--
 .../scala/org/apache/spark/rpc/RpcEnv.scala | 18 
 .../org/apache/spark/rpc/akka/AkkaRpcEnv.scala  |  5 ++
 .../spark/rpc/netty/NettyStreamManager.scala| 22 -
 .../org/apache/spark/rpc/RpcEnvSuite.scala  | 25 +-
 docs/configuration.md   |  8 
 docs/security.md|  8 
 .../org/apache/spark/repl/SparkILoop.scala  | 17 ---
 .../org/apache/spark/repl/SparkIMain.scala  | 28 ++--
 .../main/scala/org/apache/spark/repl/Main.scala | 23 +-
 .../apache/spark/repl/ExecutorClassLoader.scala | 36 ---
 .../spark/repl/ExecutorClassLoaderSuite.scala   | 48 
 15 files changed, 183 insertions(+), 98 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4a46b885/core/src/main/scala/org/apache/spark/HttpFileServer.scala
--
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala 
b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 7cf7bc0..77d8ec9 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -71,6 +71,11 @@ private[spark] class HttpFileServer(
 serverUri + "/jars/" + file.getName
   }
 
+  def addDirectory(path: String, resourceBase: String): String = {
+httpServer.addDirectory(path, resourceBase)
+serverUri + path
+  }
+
   def addFileToDir(file: File, dir: File) : String = {
 // Check whether the file is a directory. If it is, throw a more 
meaningful exception.
 // If we don't catch this, Guava throws a very confusing error message:

http://git-wip-us.apache.org/repos/asf/spark/blob/4a46b885/core/src/main/scala/org/apache/spark/HttpServer.scala
--
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala 
b/core/src/main/scala/org/apache/spark/HttpServer.scala
index 8de3a6c..faa3ef3 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -23,10 +23,9 @@ import org.eclipse.jetty.server.ssl.SslSocketConnector
 import org.eclipse.jetty.util.security.{Constraint, Password}
 import org.eclipse.jetty.security.authentication.DigestAuthenticator
 import org.eclipse.jetty.security.{ConstraintMapping, 
ConstraintSecurityHandler, HashLoginService}
-
 import org.eclipse.jetty.server.Server
 import org.eclipse.jetty.server.bio.SocketConnector
-import org.eclipse.jetty.server.handler.{DefaultHandler, HandlerList, 
ResourceHandler}
+import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, 
ServletHolder}
 import org.eclipse.jetty.util.thread.QueuedThreadPool
 
 import org.apache.spark.util.Utils
@@ -52,6 +51,11 @@ private[spark] class HttpServer(
 
   private var server: Server = null
   private var port: Int = requestedPort
+  private val servlets = {
+val handler = new ServletContextHandler()
+handler.setContextPath("/")
+handler
+  }
 
   def start() {
 if (server != null) {
@@ -65,6 +69,14 @@ private[spark] class HttpServer(
 }
   }
 
+  def addDirectory(contextPath: String, resourceBase: String): Unit = {
+val holder = new ServletHolder()
+holder.setInitParameter("resourceBase", resourceBase)
+holder.setInitParameter("pathInfoOnly", "true")
+

spark git commit: [SPARK-11713] [PYSPARK] [STREAMING] Initial RDD updateStateByKey for PySpark

2015-12-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 4a46b8859 -> 6a6c1fc5c


[SPARK-11713] [PYSPARK] [STREAMING] Initial RDD updateStateByKey for PySpark

Adding ability to define an initial state RDD for use with updateStateByKey 
PySpark.  Added unit test and changed stateful_network_wordcount example to use 
initial RDD.

Author: Bryan Cutler 

Closes #10082 from BryanCutler/initial-rdd-updateStateByKey-SPARK-11713.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a6c1fc5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a6c1fc5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a6c1fc5

Branch: refs/heads/master
Commit: 6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a
Parents: 4a46b88
Author: Bryan Cutler 
Authored: Thu Dec 10 14:21:15 2015 -0800
Committer: Davies Liu 
Committed: Thu Dec 10 14:21:15 2015 -0800

--
 .../streaming/stateful_network_wordcount.py |  5 -
 python/pyspark/streaming/dstream.py | 13 +++--
 python/pyspark/streaming/tests.py   | 20 
 .../streaming/api/python/PythonDStream.scala| 14 --
 4 files changed, 47 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6a6c1fc5/examples/src/main/python/streaming/stateful_network_wordcount.py
--
diff --git a/examples/src/main/python/streaming/stateful_network_wordcount.py 
b/examples/src/main/python/streaming/stateful_network_wordcount.py
index 16ef646..f8bbc65 100644
--- a/examples/src/main/python/streaming/stateful_network_wordcount.py
+++ b/examples/src/main/python/streaming/stateful_network_wordcount.py
@@ -44,13 +44,16 @@ if __name__ == "__main__":
 ssc = StreamingContext(sc, 1)
 ssc.checkpoint("checkpoint")
 
+# RDD with initial state (key, value) pairs
+initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
+
 def updateFunc(new_values, last_sum):
 return sum(new_values) + (last_sum or 0)
 
 lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
 running_counts = lines.flatMap(lambda line: line.split(" "))\
   .map(lambda word: (word, 1))\
-  .updateStateByKey(updateFunc)
+  .updateStateByKey(updateFunc, 
initialRDD=initialStateRDD)
 
 running_counts.pprint()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6a6c1fc5/python/pyspark/streaming/dstream.py
--
diff --git a/python/pyspark/streaming/dstream.py 
b/python/pyspark/streaming/dstream.py
index acec850..f61137c 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -568,7 +568,7 @@ class DStream(object):
  
self._ssc._jduration(slideDuration))
 return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
 
-def updateStateByKey(self, updateFunc, numPartitions=None):
+def updateStateByKey(self, updateFunc, numPartitions=None, 
initialRDD=None):
 """
 Return a new "state" DStream where the state for each key is updated 
by applying
 the given function on the previous state of the key and the new values 
of the key.
@@ -579,6 +579,9 @@ class DStream(object):
 if numPartitions is None:
 numPartitions = self._sc.defaultParallelism
 
+if initialRDD and not isinstance(initialRDD, RDD):
+initialRDD = self._sc.parallelize(initialRDD)
+
 def reduceFunc(t, a, b):
 if a is None:
 g = b.groupByKey(numPartitions).mapValues(lambda vs: 
(list(vs), None))
@@ -590,7 +593,13 @@ class DStream(object):
 
 jreduceFunc = TransformFunction(self._sc, reduceFunc,
 self._sc.serializer, 
self._jrdd_deserializer)
-dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), 
jreduceFunc)
+if initialRDD:
+initialRDD = initialRDD._reserialize(self._jrdd_deserializer)
+dstream = 
self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc,
+   initialRDD._jrdd)
+else:
+dstream = 
self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc)
+
 return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6a6c1fc5/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 

spark git commit: [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

2015-12-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 c7c99857d -> 43f02e41e


[STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

With the merge of 
[SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python 
API has the same functionalities compared to Scala/Java, so here changing the 
description to make it more precise.

zsxwing tdas , please review, thanks a lot.

Author: jerryshao 

Closes #10246 from jerryshao/direct-kafka-doc-update.

(cherry picked from commit 24d3357d66e14388faf8709b368edca70ea96432)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43f02e41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43f02e41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43f02e41

Branch: refs/heads/branch-1.4
Commit: 43f02e41e36256c2b90ee06d8b380ff2dfc7cabf
Parents: c7c9985
Author: jerryshao 
Authored: Thu Dec 10 15:31:46 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 10 15:32:16 2015 -0800

--
 docs/streaming-kafka-integration.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/43f02e41/docs/streaming-kafka-integration.md
--
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index 79b811c..100f637 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming 
application.
[Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
 
 ## Approach 2: Direct Approach (No Receivers)
-This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it 
is not yet at full feature parity.
+This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
 
 This approach has the following advantages over the receiver-based approach 
(i.e. Approach 1).
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12258][SQL] passing null into ScalaUDF

2015-12-10 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 5d3722f8e -> d09af2cb4


[SPARK-12258][SQL] passing null into ScalaUDF

Check nullability and passing them into ScalaUDF.

Closes #10249

Author: Davies Liu 

Closes #10259 from davies/udf_null.

(cherry picked from commit b1b4ee7f3541d92c8bc2b0b4fdadf46cfdb09504)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d09af2cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d09af2cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d09af2cb

Branch: refs/heads/branch-1.6
Commit: d09af2cb4237cca9ac72aacb9abb822a2982a820
Parents: 5d3722f
Author: Davies Liu 
Authored: Thu Dec 10 17:22:18 2015 -0800
Committer: Yin Huai 
Committed: Thu Dec 10 17:22:57 2015 -0800

--
 .../apache/spark/sql/catalyst/expressions/ScalaUDF.scala| 7 +--
 .../test/scala/org/apache/spark/sql/DataFrameSuite.scala| 9 +
 2 files changed, 10 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d09af2cb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index 03b8922..5deb2f8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -1029,8 +1029,11 @@ case class ScalaUDF(
 // such as IntegerType, its javaType is `int` and the returned type of 
user-defined
 // function is Object. Trying to convert an Object to `int` will cause 
casting exception.
 val evalCode = evals.map(_.code).mkString
-val funcArguments = converterTerms.zip(evals).map {
-  case (converter, eval) => s"$converter.apply(${eval.value})"
+val funcArguments = converterTerms.zipWithIndex.map {
+  case (converter, i) =>
+val eval = evals(i)
+val dt = children(i).dataType
+s"$converter.apply(${eval.isNull} ? null : (${ctx.boxedType(dt)}) 
${eval.value})"
 }.mkString(",")
 val callFunc = s"${ctx.boxedType(ctx.javaType(dataType))} $resultTerm = " +
   s"(${ctx.boxedType(ctx.javaType(dataType))})${catalystConverterTerm}" +

http://git-wip-us.apache.org/repos/asf/spark/blob/d09af2cb/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 76e9648..ce4fb23 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1131,14 +1131,15 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("SPARK-11725: correctly handle null inputs for ScalaUDF") {
-val df = Seq(
+val df = sparkContext.parallelize(Seq(
   new java.lang.Integer(22) -> "John",
-  null.asInstanceOf[java.lang.Integer] -> "Lucy").toDF("age", "name")
+  null.asInstanceOf[java.lang.Integer] -> "Lucy")).toDF("age", "name")
 
+// passing null into the UDF that could handle it
 val boxedUDF = udf[java.lang.Integer, java.lang.Integer] {
-  (i: java.lang.Integer) => if (i == null) null else i * 2
+  (i: java.lang.Integer) => if (i == null) -10 else i * 2
 }
-checkAnswer(df.select(boxedUDF($"age")), Row(44) :: Row(null) :: Nil)
+checkAnswer(df.select(boxedUDF($"age")), Row(44) :: Row(-10) :: Nil)
 
 val primitiveUDF = udf((i: Int) => i * 2)
 checkAnswer(df.select(primitiveUDF($"age")), Row(44) :: Row(null) :: Nil)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management

2015-12-10 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 9870e5c7a -> c247b6a65


[SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management

**Problem.** In unified memory management, acquiring execution memory may lead 
to eviction of storage memory. However, the space freed from evicting cached 
blocks is distributed among all active tasks. Thus, an incorrect upper bound on 
the execution memory per task can cause the acquisition to fail, leading to 
OOM's and premature spills.

**Example.** Suppose total memory is 1000B, cached blocks occupy 900B, 
`spark.memory.storageFraction` is 0.4, and there are two active tasks. In this 
case, the cap on task execution memory is 100B / 2 = 50B. If task A tries to 
acquire 200B, it will evict 100B of storage but can only acquire 50B because of 
the incorrect cap. For another example, see this [regression 
test](https://github.com/andrewor14/spark/blob/fix-oom/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala#L233)
 that I stole from JoshRosen.

**Solution.** Fix the cap on task execution memory. It should take into account 
the space that could have been freed by storage in addition to the current 
amount of memory available to execution. In the example above, the correct cap 
should have been 600B / 2 = 300B.

This patch also guards against the race condition (SPARK-12253):
(1) Existing tasks collectively occupy all execution memory
(2) New task comes in and blocks while existing tasks spill
(3) After tasks finish spilling, another task jumps in and puts in a large 
block, stealing the freed memory
(4) New task still cannot acquire memory and goes back to sleep

Author: Andrew Or 

Closes #10240 from andrewor14/fix-oom.

(cherry picked from commit 5030923ea8bb94ac8fa8e432de9fc7089aa93986)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c247b6a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c247b6a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c247b6a6

Branch: refs/heads/branch-1.6
Commit: c247b6a6546e12e3c6992c40cad1881d56aefd6f
Parents: 9870e5c
Author: Andrew Or 
Authored: Thu Dec 10 15:30:08 2015 -0800
Committer: Andrew Or 
Committed: Thu Dec 10 15:30:14 2015 -0800

--
 .../spark/memory/ExecutionMemoryPool.scala  | 57 ++--
 .../spark/memory/UnifiedMemoryManager.scala | 57 +++-
 .../scala/org/apache/spark/scheduler/Task.scala |  6 +++
 .../memory/UnifiedMemoryManagerSuite.scala  | 25 +
 4 files changed, 114 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c247b6a6/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
index 9023e1a..dbb0ad8 100644
--- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -70,11 +70,28 @@ private[memory] class ExecutionMemoryPool(
* active tasks) before it is forced to spill. This can happen if the number 
of tasks increase
* but an older task had a lot of memory already.
*
+   * @param numBytes number of bytes to acquire
+   * @param taskAttemptId the task attempt acquiring memory
+   * @param maybeGrowPool a callback that potentially grows the size of this 
pool. It takes in
+   *  one parameter (Long) that represents the desired 
amount of memory by
+   *  which this pool should be expanded.
+   * @param computeMaxPoolSize a callback that returns the maximum allowable 
size of this pool
+   *   at this given moment. This is not a field 
because the max pool
+   *   size is variable in certain cases. For 
instance, in unified
+   *   memory management, the execution pool can be 
expanded by evicting
+   *   cached blocks, thereby shrinking the storage 
pool.
+   *
* @return the number of bytes granted to the task.
*/
-  def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = 
lock.synchronized {
+  private[memory] def acquireMemory(
+  numBytes: Long,
+  taskAttemptId: Long,
+  maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
+  computeMaxPoolSize: () => Long = () => poolSize): Long = 
lock.synchronized {
 assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
 
+// TODO: clean up this clunky method 

spark git commit: [SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management

2015-12-10 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 23a9e62ba -> 5030923ea


[SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management

**Problem.** In unified memory management, acquiring execution memory may lead 
to eviction of storage memory. However, the space freed from evicting cached 
blocks is distributed among all active tasks. Thus, an incorrect upper bound on 
the execution memory per task can cause the acquisition to fail, leading to 
OOM's and premature spills.

**Example.** Suppose total memory is 1000B, cached blocks occupy 900B, 
`spark.memory.storageFraction` is 0.4, and there are two active tasks. In this 
case, the cap on task execution memory is 100B / 2 = 50B. If task A tries to 
acquire 200B, it will evict 100B of storage but can only acquire 50B because of 
the incorrect cap. For another example, see this [regression 
test](https://github.com/andrewor14/spark/blob/fix-oom/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala#L233)
 that I stole from JoshRosen.

**Solution.** Fix the cap on task execution memory. It should take into account 
the space that could have been freed by storage in addition to the current 
amount of memory available to execution. In the example above, the correct cap 
should have been 600B / 2 = 300B.

This patch also guards against the race condition (SPARK-12253):
(1) Existing tasks collectively occupy all execution memory
(2) New task comes in and blocks while existing tasks spill
(3) After tasks finish spilling, another task jumps in and puts in a large 
block, stealing the freed memory
(4) New task still cannot acquire memory and goes back to sleep

Author: Andrew Or 

Closes #10240 from andrewor14/fix-oom.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5030923e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5030923e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5030923e

Branch: refs/heads/master
Commit: 5030923ea8bb94ac8fa8e432de9fc7089aa93986
Parents: 23a9e62
Author: Andrew Or 
Authored: Thu Dec 10 15:30:08 2015 -0800
Committer: Andrew Or 
Committed: Thu Dec 10 15:30:08 2015 -0800

--
 .../spark/memory/ExecutionMemoryPool.scala  | 57 ++--
 .../spark/memory/UnifiedMemoryManager.scala | 57 +++-
 .../scala/org/apache/spark/scheduler/Task.scala |  6 +++
 .../memory/UnifiedMemoryManagerSuite.scala  | 25 +
 4 files changed, 114 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5030923e/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
index 9023e1a..dbb0ad8 100644
--- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -70,11 +70,28 @@ private[memory] class ExecutionMemoryPool(
* active tasks) before it is forced to spill. This can happen if the number 
of tasks increase
* but an older task had a lot of memory already.
*
+   * @param numBytes number of bytes to acquire
+   * @param taskAttemptId the task attempt acquiring memory
+   * @param maybeGrowPool a callback that potentially grows the size of this 
pool. It takes in
+   *  one parameter (Long) that represents the desired 
amount of memory by
+   *  which this pool should be expanded.
+   * @param computeMaxPoolSize a callback that returns the maximum allowable 
size of this pool
+   *   at this given moment. This is not a field 
because the max pool
+   *   size is variable in certain cases. For 
instance, in unified
+   *   memory management, the execution pool can be 
expanded by evicting
+   *   cached blocks, thereby shrinking the storage 
pool.
+   *
* @return the number of bytes granted to the task.
*/
-  def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = 
lock.synchronized {
+  private[memory] def acquireMemory(
+  numBytes: Long,
+  taskAttemptId: Long,
+  maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
+  computeMaxPoolSize: () => Long = () => poolSize): Long = 
lock.synchronized {
 assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
 
+// TODO: clean up this clunky method signature
+
 // Add this task to the taskMemory map just so we can keep an accurate 
count of the number
 // of active 

spark git commit: [SPARK-12251] Document and improve off-heap memory configurations

2015-12-10 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d0307deaa -> 9870e5c7a


[SPARK-12251] Document and improve off-heap memory configurations

This patch adds documentation for Spark configurations that affect off-heap 
memory and makes some naming and validation improvements for those configs.

- Change `spark.memory.offHeapSize` to `spark.memory.offHeap.size`. This is 
fine because this configuration has not shipped in any Spark release yet (it's 
new in Spark 1.6).
- Deprecated `spark.unsafe.offHeap` in favor of a new 
`spark.memory.offHeap.enabled` configuration. The motivation behind this change 
is to gather all memory-related configurations under the same prefix.
- Add a check which prevents users from setting 
`spark.memory.offHeap.enabled=true` when `spark.memory.offHeap.size == 0`. 
After SPARK-11389 (#9344), which was committed in Spark 1.6, Spark enforces a 
hard limit on the amount of off-heap memory that it will allocate to tasks. As 
a result, enabling off-heap execution memory without setting 
`spark.memory.offHeap.size` will lead to immediate OOMs. The new configuration 
validation makes this scenario easier to diagnose, helping to avoid user 
confusion.
- Document these configurations on the configuration page.

Author: Josh Rosen 

Closes #10237 from JoshRosen/SPARK-12251.

(cherry picked from commit 23a9e62bad9669e9ff5dc4bd714f58d12f9be0b5)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9870e5c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9870e5c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9870e5c7

Branch: refs/heads/branch-1.6
Commit: 9870e5c7af87190167ca3845ede918671b9420ca
Parents: d0307de
Author: Josh Rosen 
Authored: Thu Dec 10 15:29:04 2015 -0800
Committer: Andrew Or 
Committed: Thu Dec 10 15:29:13 2015 -0800

--
 .../main/scala/org/apache/spark/SparkConf.scala |  4 +++-
 .../org/apache/spark/memory/MemoryManager.scala | 10 --
 .../spark/memory/TaskMemoryManagerSuite.java| 21 
 .../shuffle/sort/PackedRecordPointerSuite.java  |  6 --
 .../sort/ShuffleInMemorySorterSuite.java|  4 ++--
 .../shuffle/sort/UnsafeShuffleWriterSuite.java  |  2 +-
 .../map/AbstractBytesToBytesMapSuite.java   |  4 ++--
 .../unsafe/sort/UnsafeExternalSorterSuite.java  |  2 +-
 .../unsafe/sort/UnsafeInMemorySorterSuite.java  |  4 ++--
 .../spark/memory/StaticMemoryManagerSuite.scala |  2 +-
 .../memory/UnifiedMemoryManagerSuite.scala  |  2 +-
 docs/configuration.md   | 16 +++
 .../sql/execution/joins/HashedRelation.scala|  6 +-
 .../UnsafeFixedWidthAggregationMapSuite.scala   |  2 +-
 .../execution/UnsafeKVExternalSorterSuite.scala |  2 +-
 15 files changed, 65 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9870e5c7/core/src/main/scala/org/apache/spark/SparkConf.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 19633a3..d3384fb 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging {
 "spark.streaming.fileStream.minRememberDuration" -> Seq(
   AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
 "spark.yarn.max.executor.failures" -> Seq(
-  AlternateConfig("spark.yarn.max.worker.failures", "1.5"))
+  AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
+"spark.memory.offHeap.enabled" -> Seq(
+  AlternateConfig("spark.unsafe.offHeap", "1.6"))
 )
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9870e5c7/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index ae9e1ac..e707e27 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -50,7 +50,7 @@ private[spark] abstract class MemoryManager(
 
   storageMemoryPool.incrementPoolSize(storageMemory)
   onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
-  
offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeapSize",
 0))
+  
offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size",
 0))
 
   /**
* Total available memory 

spark git commit: [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

2015-12-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4b99f72f7 -> cb0246c93


[STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

With the merge of 
[SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python 
API has the same functionalities compared to Scala/Java, so here changing the 
description to make it more precise.

zsxwing tdas , please review, thanks a lot.

Author: jerryshao 

Closes #10246 from jerryshao/direct-kafka-doc-update.

(cherry picked from commit 24d3357d66e14388faf8709b368edca70ea96432)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb0246c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb0246c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb0246c9

Branch: refs/heads/branch-1.5
Commit: cb0246c9314892dbb3403488154b2d987c90b1dd
Parents: 4b99f72
Author: jerryshao 
Authored: Thu Dec 10 15:31:46 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 10 15:32:05 2015 -0800

--
 docs/streaming-kafka-integration.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb0246c9/docs/streaming-kafka-integration.md
--
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index ab7f011..d58f4f6 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming 
application.
[Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
 
 ## Approach 2: Direct Approach (No Receivers)
-This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it 
is not yet at full feature parity.
+This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
 
 This approach has the following advantages over the receiver-based approach 
(i.e. Approach 1).
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

2015-12-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 5030923ea -> 24d3357d6


[STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

With the merge of 
[SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python 
API has the same functionalities compared to Scala/Java, so here changing the 
description to make it more precise.

zsxwing tdas , please review, thanks a lot.

Author: jerryshao 

Closes #10246 from jerryshao/direct-kafka-doc-update.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24d3357d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24d3357d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24d3357d

Branch: refs/heads/master
Commit: 24d3357d66e14388faf8709b368edca70ea96432
Parents: 5030923
Author: jerryshao 
Authored: Thu Dec 10 15:31:46 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 10 15:31:46 2015 -0800

--
 docs/streaming-kafka-integration.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/24d3357d/docs/streaming-kafka-integration.md
--
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index b00351b..5be73c4 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming 
application.
[Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
 
 ## Approach 2: Direct Approach (No Receivers)
-This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it 
is not yet at full feature parity.
+This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
 
 This approach has the following advantages over the receiver-based approach 
(i.e. Approach 1).
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11602][MLLIB] Refine visibility for 1.6 scala API audit

2015-12-10 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 b7b9f7727 -> e65c88536


[SPARK-11602][MLLIB] Refine visibility for 1.6 scala API audit

jira: https://issues.apache.org/jira/browse/SPARK-11602

Made a pass on the API change of 1.6. Open the PR for efficient discussion.

Author: Yuhao Yang 

Closes #9939 from hhbyyh/auditScala.

(cherry picked from commit 9fba9c8004d2b97549e5456fa7918965bec27336)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e65c8853
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e65c8853
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e65c8853

Branch: refs/heads/branch-1.6
Commit: e65c88536ad1843a45e0fe3cf1edadfdf4ad3460
Parents: b7b9f77
Author: Yuhao Yang 
Authored: Thu Dec 10 10:15:50 2015 -0800
Committer: Joseph K. Bradley 
Committed: Thu Dec 10 10:16:04 2015 -0800

--
 .../src/main/scala/org/apache/spark/ml/feature/package-info.java | 2 +-
 .../scala/org/apache/spark/ml/regression/LinearRegression.scala  | 2 +-
 .../org/apache/spark/mllib/clustering/BisectingKMeans.scala  | 2 +-
 .../org/apache/spark/mllib/clustering/BisectingKMeansModel.scala | 4 ++--
 4 files changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e65c8853/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java 
b/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java
index c22d2e0..7a35f2d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java
@@ -23,7 +23,7 @@
  * features into more suitable forms for model fitting.
  * Most feature transformers are implemented as {@link 
org.apache.spark.ml.Transformer}s, which
  * transforms one {@link org.apache.spark.sql.DataFrame} into another, e.g.,
- * {@link org.apache.spark.feature.HashingTF}.
+ * {@link org.apache.spark.ml.feature.HashingTF}.
  * Some feature transformers are implemented as {@link 
org.apache.spark.ml.Estimator}}s, because the
  * transformation requires some aggregated information of the dataset, e.g., 
document
  * frequencies in {@link org.apache.spark.ml.feature.IDF}.

http://git-wip-us.apache.org/repos/asf/spark/blob/e65c8853/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala 
b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 1db9166..5e58509 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -529,7 +529,7 @@ class LinearRegressionSummary private[regression] (
 val predictionCol: String,
 val labelCol: String,
 val model: LinearRegressionModel,
-val diagInvAtWA: Array[Double]) extends Serializable {
+private val diagInvAtWA: Array[Double]) extends Serializable {
 
   @transient private val metrics = new RegressionMetrics(
 predictions

http://git-wip-us.apache.org/repos/asf/spark/blob/e65c8853/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
index 82adfa6..54bf510 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
@@ -407,7 +407,7 @@ private object BisectingKMeans extends Serializable {
  */
 @Since("1.6.0")
 @Experimental
-class ClusteringTreeNode private[clustering] (
+private[clustering] class ClusteringTreeNode private[clustering] (
 val index: Int,
 val size: Long,
 private val centerWithNorm: VectorWithNorm,

http://git-wip-us.apache.org/repos/asf/spark/blob/e65c8853/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
index f942e56..9ccf96b 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
+++ 

spark git commit: [SPARK-12234][SPARKR] Fix ```subset``` function error when only set ```select``` argument

2015-12-10 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e65c88536 -> 93ef24638


[SPARK-12234][SPARKR] Fix ```subset``` function error when only set 
```select``` argument

Fix ```subset``` function error when only set ```select``` argument. Please 
refer to the [JIRA](https://issues.apache.org/jira/browse/SPARK-12234) about 
the error and how to reproduce it.

cc sun-rui felixcheung shivaram

Author: Yanbo Liang 

Closes #10217 from yanboliang/spark-12234.

(cherry picked from commit d9d354ed40eec56b3f03d32f4e2629d367b1bf02)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93ef2463
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93ef2463
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93ef2463

Branch: refs/heads/branch-1.6
Commit: 93ef2463820928f434f9fb1542bc30cfb1cec9aa
Parents: e65c885
Author: Yanbo Liang 
Authored: Thu Dec 10 10:18:58 2015 -0800
Committer: Shivaram Venkataraman 
Committed: Thu Dec 10 10:19:06 2015 -0800

--
 R/pkg/R/DataFrame.R   | 9 +++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 
 2 files changed, 11 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/93ef2463/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 81b4e6b..f4c4a25 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1185,7 +1185,7 @@ setMethod("[", signature(x = "DataFrame", i = "Column"),
 #'
 #' Return subsets of DataFrame according to given conditions
 #' @param x A DataFrame
-#' @param subset A logical expression to filter on rows
+#' @param subset (Optional) A logical expression to filter on rows
 #' @param select expression for the single Column or a list of columns to 
select from the DataFrame
 #' @return A new DataFrame containing only the rows that meet the condition 
with selected columns
 #' @export
@@ -1206,10 +1206,15 @@ setMethod("[", signature(x = "DataFrame", i = "Column"),
 #'   df[df$age %in% c(19, 30), 1:2]
 #'   subset(df, df$age %in% c(19, 30), 1:2)
 #'   subset(df, df$age %in% c(19), select = c(1,2))
+#'   subset(df, select = c(1,2))
 #' }
 setMethod("subset", signature(x = "DataFrame"),
   function(x, subset, select, ...) {
-x[subset, select, ...]
+if (missing(subset)) {
+  x[, select, ...]
+} else {
+  x[subset, select, ...]
+}
   })
 
 #' Select

http://git-wip-us.apache.org/repos/asf/spark/blob/93ef2463/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 222c04a..2051784 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -799,6 +799,10 @@ test_that("subsetting", {
   expect_equal(count(df6), 1)
   expect_equal(columns(df6), c("name", "age"))
 
+  df7 <- subset(df, select = "name")
+  expect_equal(count(df7), 3)
+  expect_equal(columns(df7), c("name"))
+
   # Test base::subset is working
   expect_equal(nrow(subset(airquality, Temp > 80, select = c(Ozone, Temp))), 
68)
 })


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12198][SPARKR] SparkR support read.parquet and deprecate parquetFile

2015-12-10 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 f939c71b1 -> b7b9f7727


[SPARK-12198][SPARKR] SparkR support read.parquet and deprecate parquetFile

SparkR support ```read.parquet``` and deprecate ```parquetFile```. This change 
is similar with #10145 for ```jsonFile```.

Author: Yanbo Liang 

Closes #10191 from yanboliang/spark-12198.

(cherry picked from commit eeb58722ad73441eeb5f35f864be3c5392cfd426)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7b9f772
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7b9f772
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7b9f772

Branch: refs/heads/branch-1.6
Commit: b7b9f772751dc4ea7eb28a2bdb897a04e563fafa
Parents: f939c71
Author: Yanbo Liang 
Authored: Thu Dec 10 09:44:53 2015 -0800
Committer: Shivaram Venkataraman 
Committed: Thu Dec 10 09:45:01 2015 -0800

--
 R/pkg/NAMESPACE   |  1 +
 R/pkg/R/SQLContext.R  | 16 ++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 +++
 3 files changed, 22 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b7b9f772/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 565a2b1..ba64bc5 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -270,6 +270,7 @@ export("as.DataFrame",
"loadDF",
"parquetFile",
"read.df",
+   "read.parquet",
"sql",
"table",
"tableNames",

http://git-wip-us.apache.org/repos/asf/spark/blob/b7b9f772/R/pkg/R/SQLContext.R
--
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 85541c8..f678c70 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -256,18 +256,30 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, 
samplingRatio = 1.0) {
   }
 }
 
-
 #' Create a DataFrame from a Parquet file.
 #'
 #' Loads a Parquet file, returning the result as a DataFrame.
 #'
 #' @param sqlContext SQLContext to use
-#' @param ... Path(s) of parquet file(s) to read.
+#' @param path Path of file to read. A vector of multiple paths is allowed.
 #' @return DataFrame
+#' @rdname read.parquet
+#' @name read.parquet
 #' @export
+read.parquet <- function(sqlContext, path) {
+  # Allow the user to have a more flexible definiton of the text file path
+  paths <- as.list(suppressWarnings(normalizePath(path)))
+  read <- callJMethod(sqlContext, "read")
+  sdf <- callJMethod(read, "parquet", paths)
+  dataFrame(sdf)
+}
 
+#' @rdname read.parquet
+#' @name parquetFile
+#' @export
 # TODO: Implement saveasParquetFile and write examples for both
 parquetFile <- function(sqlContext, ...) {
+  .Deprecated("read.parquet")
   # Allow the user to have a more flexible definiton of the text file path
   paths <- lapply(list(...), function(x) suppressWarnings(normalizePath(x)))
   sdf <- callJMethod(sqlContext, "parquetFile", paths)

http://git-wip-us.apache.org/repos/asf/spark/blob/b7b9f772/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 39fc94a..222c04a 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1420,22 +1420,25 @@ test_that("mutate(), transform(), rename() and 
names()", {
   detach(airquality)
 })
 
-test_that("write.df() on DataFrame and works with parquetFile", {
+test_that("write.df() on DataFrame and works with read.parquet", {
   df <- jsonFile(sqlContext, jsonPath)
   write.df(df, parquetPath, "parquet", mode="overwrite")
-  parquetDF <- parquetFile(sqlContext, parquetPath)
+  parquetDF <- read.parquet(sqlContext, parquetPath)
   expect_is(parquetDF, "DataFrame")
   expect_equal(count(df), count(parquetDF))
 })
 
-test_that("parquetFile works with multiple input paths", {
+test_that("read.parquet()/parquetFile() works with multiple input paths", {
   df <- jsonFile(sqlContext, jsonPath)
   write.df(df, parquetPath, "parquet", mode="overwrite")
   parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
   write.df(df, parquetPath2, "parquet", mode="overwrite")
-  parquetDF <- parquetFile(sqlContext, parquetPath, parquetPath2)
+  parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2))
   expect_is(parquetDF, "DataFrame")
   expect_equal(count(parquetDF), count(df) * 2)
+  parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, 
parquetPath2))
+  expect_is(parquetDF2, "DataFrame")
+  

spark git commit: [SPARK-12136][STREAMING] rddToFileName does not properly handle prefix and suffix parameters

2015-12-10 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 f6d866173 -> b5e5812f9


[SPARK-12136][STREAMING] rddToFileName does not properly handle prefix and 
suffix parameters

The original code does not properly handle the cases where the prefix is null, 
but suffix is not null - the suffix should be used but is not.

The fix is using StringBuilder to construct the proper file name.

Author: bomeng 
Author: Bo Meng 

Closes #10185 from bomeng/SPARK-12136.

(cherry picked from commit e29704f90dfe67d9e276d242699ac0a00f64fb91)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b5e5812f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b5e5812f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b5e5812f

Branch: refs/heads/branch-1.6
Commit: b5e5812f9ef8aa8d133a75bb8aa8dd8680130efa
Parents: f6d8661
Author: bomeng 
Authored: Thu Dec 10 12:53:53 2015 +
Committer: Sean Owen 
Committed: Thu Dec 10 12:54:08 2015 +

--
 .../org/apache/spark/streaming/StreamingContext.scala  | 13 +++--
 1 file changed, 7 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b5e5812f/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6fb8ad3..53324e7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -887,12 +887,13 @@ object StreamingContext extends Logging {
   }
 
   private[streaming] def rddToFileName[T](prefix: String, suffix: String, 
time: Time): String = {
-if (prefix == null) {
-  time.milliseconds.toString
-} else if (suffix == null || suffix.length ==0) {
-  prefix + "-" + time.milliseconds
-} else {
-  prefix + "-" + time.milliseconds + "." + suffix
+var result = time.milliseconds.toString
+if (prefix != null && prefix.length > 0) {
+  result = s"$prefix-$result"
+}
+if (suffix != null && suffix.length > 0) {
+  result = s"$result.$suffix"
 }
+result
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12136][STREAMING] rddToFileName does not properly handle prefix and suffix parameters

2015-12-10 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master d8ec081c9 -> e29704f90


[SPARK-12136][STREAMING] rddToFileName does not properly handle prefix and 
suffix parameters

The original code does not properly handle the cases where the prefix is null, 
but suffix is not null - the suffix should be used but is not.

The fix is using StringBuilder to construct the proper file name.

Author: bomeng 
Author: Bo Meng 

Closes #10185 from bomeng/SPARK-12136.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e29704f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e29704f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e29704f9

Branch: refs/heads/master
Commit: e29704f90dfe67d9e276d242699ac0a00f64fb91
Parents: d8ec081
Author: bomeng 
Authored: Thu Dec 10 12:53:53 2015 +
Committer: Sean Owen 
Committed: Thu Dec 10 12:53:53 2015 +

--
 .../org/apache/spark/streaming/StreamingContext.scala  | 13 +++--
 1 file changed, 7 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e29704f9/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index cf843e3..b24c0d0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -892,12 +892,13 @@ object StreamingContext extends Logging {
   }
 
   private[streaming] def rddToFileName[T](prefix: String, suffix: String, 
time: Time): String = {
-if (prefix == null) {
-  time.milliseconds.toString
-} else if (suffix == null || suffix.length ==0) {
-  prefix + "-" + time.milliseconds
-} else {
-  prefix + "-" + time.milliseconds + "." + suffix
+var result = time.milliseconds.toString
+if (prefix != null && prefix.length > 0) {
+  result = s"$prefix-$result"
+}
+if (suffix != null && suffix.length > 0) {
+  result = s"$result.$suffix"
 }
+result
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11530][MLLIB] Return eigenvalues with PCA model

2015-12-10 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master e29704f90 -> 21b3d2a75


[SPARK-11530][MLLIB] Return eigenvalues with PCA model

Add `computePrincipalComponentsAndVariance` to also compute PCA's explained 
variance.

CC mengxr

Author: Sean Owen 

Closes #9736 from srowen/SPARK-11530.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21b3d2a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21b3d2a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21b3d2a7

Branch: refs/heads/master
Commit: 21b3d2a75f679b252e293000d706741dca33624a
Parents: e29704f
Author: Sean Owen 
Authored: Thu Dec 10 14:05:45 2015 +
Committer: Sean Owen 
Committed: Thu Dec 10 14:05:45 2015 +

--
 .../scala/org/apache/spark/ml/feature/PCA.scala | 20 ++--
 .../org/apache/spark/mllib/feature/PCA.scala| 16 +++---
 .../mllib/linalg/distributed/RowMatrix.scala| 33 +++-
 .../org/apache/spark/ml/feature/PCASuite.scala  |  7 +++--
 .../apache/spark/mllib/feature/PCASuite.scala   |  3 +-
 .../linalg/distributed/RowMatrixSuite.scala | 10 +-
 project/MimaExcludes.scala  |  3 ++
 7 files changed, 67 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/21b3d2a7/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
index aa88cb0..53d33ea 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
@@ -73,7 +73,7 @@ class PCA (override val uid: String) extends 
Estimator[PCAModel] with PCAParams
 val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v}
 val pca = new feature.PCA(k = $(k))
 val pcaModel = pca.fit(input)
-copyValues(new PCAModel(uid, pcaModel.pc).setParent(this))
+copyValues(new PCAModel(uid, pcaModel.pc, 
pcaModel.explainedVariance).setParent(this))
   }
 
   override def transformSchema(schema: StructType): StructType = {
@@ -105,7 +105,8 @@ object PCA extends DefaultParamsReadable[PCA] {
 @Experimental
 class PCAModel private[ml] (
 override val uid: String,
-val pc: DenseMatrix)
+val pc: DenseMatrix,
+val explainedVariance: DenseVector)
   extends Model[PCAModel] with PCAParams with MLWritable {
 
   import PCAModel._
@@ -123,7 +124,7 @@ class PCAModel private[ml] (
*/
   override def transform(dataset: DataFrame): DataFrame = {
 transformSchema(dataset.schema, logging = true)
-val pcaModel = new feature.PCAModel($(k), pc)
+val pcaModel = new feature.PCAModel($(k), pc, explainedVariance)
 val pcaOp = udf { pcaModel.transform _ }
 dataset.withColumn($(outputCol), pcaOp(col($(inputCol
   }
@@ -139,7 +140,7 @@ class PCAModel private[ml] (
   }
 
   override def copy(extra: ParamMap): PCAModel = {
-val copied = new PCAModel(uid, pc)
+val copied = new PCAModel(uid, pc, explainedVariance)
 copyValues(copied, extra).setParent(parent)
   }
 
@@ -152,11 +153,11 @@ object PCAModel extends MLReadable[PCAModel] {
 
   private[PCAModel] class PCAModelWriter(instance: PCAModel) extends MLWriter {
 
-private case class Data(pc: DenseMatrix)
+private case class Data(pc: DenseMatrix, explainedVariance: DenseVector)
 
 override protected def saveImpl(path: String): Unit = {
   DefaultParamsWriter.saveMetadata(instance, path, sc)
-  val data = Data(instance.pc)
+  val data = Data(instance.pc, instance.explainedVariance)
   val dataPath = new Path(path, "data").toString
   
sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
 }
@@ -169,10 +170,11 @@ object PCAModel extends MLReadable[PCAModel] {
 override def load(path: String): PCAModel = {
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
   val dataPath = new Path(path, "data").toString
-  val Row(pc: DenseMatrix) = sqlContext.read.parquet(dataPath)
-.select("pc")
+  val Row(pc: DenseMatrix, explainedVariance: DenseVector) =
+sqlContext.read.parquet(dataPath)
+.select("pc", "explainedVariance")
 .head()
-  val model = new PCAModel(metadata.uid, pc)
+  val model = new PCAModel(metadata.uid, pc, explainedVariance)
   DefaultParamsReader.getAndSetParams(model, metadata)
   model
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/21b3d2a7/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
--
diff --git 

spark git commit: [SPARK-12234][SPARKR] Fix ```subset``` function error when only set ```select``` argument

2015-12-10 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 9fba9c800 -> d9d354ed4


[SPARK-12234][SPARKR] Fix ```subset``` function error when only set 
```select``` argument

Fix ```subset``` function error when only set ```select``` argument. Please 
refer to the [JIRA](https://issues.apache.org/jira/browse/SPARK-12234) about 
the error and how to reproduce it.

cc sun-rui felixcheung shivaram

Author: Yanbo Liang 

Closes #10217 from yanboliang/spark-12234.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9d354ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9d354ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9d354ed

Branch: refs/heads/master
Commit: d9d354ed40eec56b3f03d32f4e2629d367b1bf02
Parents: 9fba9c8
Author: Yanbo Liang 
Authored: Thu Dec 10 10:18:58 2015 -0800
Committer: Shivaram Venkataraman 
Committed: Thu Dec 10 10:18:58 2015 -0800

--
 R/pkg/R/DataFrame.R   | 9 +++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 
 2 files changed, 11 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d9d354ed/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 81b4e6b..f4c4a25 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1185,7 +1185,7 @@ setMethod("[", signature(x = "DataFrame", i = "Column"),
 #'
 #' Return subsets of DataFrame according to given conditions
 #' @param x A DataFrame
-#' @param subset A logical expression to filter on rows
+#' @param subset (Optional) A logical expression to filter on rows
 #' @param select expression for the single Column or a list of columns to 
select from the DataFrame
 #' @return A new DataFrame containing only the rows that meet the condition 
with selected columns
 #' @export
@@ -1206,10 +1206,15 @@ setMethod("[", signature(x = "DataFrame", i = "Column"),
 #'   df[df$age %in% c(19, 30), 1:2]
 #'   subset(df, df$age %in% c(19, 30), 1:2)
 #'   subset(df, df$age %in% c(19), select = c(1,2))
+#'   subset(df, select = c(1,2))
 #' }
 setMethod("subset", signature(x = "DataFrame"),
   function(x, subset, select, ...) {
-x[subset, select, ...]
+if (missing(subset)) {
+  x[, select, ...]
+} else {
+  x[subset, select, ...]
+}
   })
 
 #' Select

http://git-wip-us.apache.org/repos/asf/spark/blob/d9d354ed/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 222c04a..2051784 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -799,6 +799,10 @@ test_that("subsetting", {
   expect_equal(count(df6), 1)
   expect_equal(columns(df6), c("name", "age"))
 
+  df7 <- subset(df, select = "name")
+  expect_equal(count(df7), 3)
+  expect_equal(columns(df7), c("name"))
+
   # Test base::subset is working
   expect_equal(nrow(subset(airquality, Temp > 80, select = c(Ozone, Temp))), 
68)
 })


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12012][SQL][BRANCH-1.6] Show more comprehensive PhysicalRDD metadata when visualizing SQL query plan

2015-12-10 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 93ef24638 -> e541f703d


[SPARK-12012][SQL][BRANCH-1.6] Show more comprehensive PhysicalRDD metadata 
when visualizing SQL query plan

This PR backports PR #10004 to branch-1.6

It adds a private[sql] method metadata to SparkPlan, which can be used to 
describe detail information about a physical plan during visualization. 
Specifically, this PR uses this method to provide details of PhysicalRDDs 
translated from a data source relation.

Author: Cheng Lian 

Closes #10250 from liancheng/spark-12012.for-1.6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e541f703
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e541f703
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e541f703

Branch: refs/heads/branch-1.6
Commit: e541f703d72d3dd3ad96db55650c5b1a1a5a38e2
Parents: 93ef246
Author: Cheng Lian 
Authored: Thu Dec 10 10:19:44 2015 -0800
Committer: Yin Huai 
Committed: Thu Dec 10 10:19:49 2015 -0800

--
 python/pyspark/sql/dataframe.py |  2 +-
 .../spark/sql/execution/ExistingRDD.scala   | 19 ++---
 .../apache/spark/sql/execution/SparkPlan.scala  |  5 +++
 .../spark/sql/execution/SparkStrategies.scala   |  2 +-
 .../datasources/DataSourceStrategy.scala| 22 --
 .../datasources/parquet/ParquetRelation.scala   | 10 +
 .../spark/sql/execution/ui/SparkPlanGraph.scala | 43 
 .../apache/spark/sql/sources/interfaces.scala   |  2 +-
 .../spark/sql/execution/PlannerSuite.scala  |  2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  7 +++-
 10 files changed, 83 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 746bb55..78ab475 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -213,7 +213,7 @@ class DataFrame(object):
 
 >>> df.explain()
 == Physical Plan ==
-Scan PhysicalRDD[age#0,name#1]
+Scan ExistingRDD[age#0,name#1]
 
 >>> df.explain(True)
 == Parsed Logical Plan ==

http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 623348f..b8a4302 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -97,22 +97,31 @@ private[sql] case class LogicalRDD(
 private[sql] case class PhysicalRDD(
 output: Seq[Attribute],
 rdd: RDD[InternalRow],
-extraInformation: String,
+override val nodeName: String,
+override val metadata: Map[String, String] = Map.empty,
 override val outputsUnsafeRows: Boolean = false)
   extends LeafNode {
 
   protected override def doExecute(): RDD[InternalRow] = rdd
 
-  override def simpleString: String = "Scan " + extraInformation + 
output.mkString("[", ",", "]")
+  override def simpleString: String = {
+val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield 
s"$key: $value"
+s"Scan $nodeName${output.mkString("[", ",", 
"]")}${metadataEntries.mkString(" ", ", ", "")}"
+  }
 }
 
 private[sql] object PhysicalRDD {
+  // Metadata keys
+  val INPUT_PATHS = "InputPaths"
+  val PUSHED_FILTERS = "PushedFilters"
+
   def createFromDataSource(
   output: Seq[Attribute],
   rdd: RDD[InternalRow],
   relation: BaseRelation,
-  extraInformation: String = ""): PhysicalRDD = {
-PhysicalRDD(output, rdd, relation.toString + extraInformation,
-  relation.isInstanceOf[HadoopFsRelation])
+  metadata: Map[String, String] = Map.empty): PhysicalRDD = {
+// All HadoopFsRelations output UnsafeRows
+val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
+PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index a781777..ec98f81 100644
--- 

spark git commit: [SPARK-12242][SQL] Add DataFrame.transform method

2015-12-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 b5e5812f9 -> f939c71b1


[SPARK-12242][SQL] Add DataFrame.transform method

Author: Reynold Xin 

Closes #10226 from rxin/df-transform.

(cherry picked from commit 76540b6df5370b463277d3498097b2cc2d2e97a8)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f939c71b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f939c71b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f939c71b

Branch: refs/heads/branch-1.6
Commit: f939c71b187cff3a5bb63aa3659429b6efb0626d
Parents: b5e5812
Author: Reynold Xin 
Authored: Thu Dec 10 22:23:10 2015 +0800
Committer: Reynold Xin 
Committed: Thu Dec 10 22:23:26 2015 +0800

--
 .../src/main/scala/org/apache/spark/sql/Column.scala   |  2 +-
 .../main/scala/org/apache/spark/sql/DataFrame.scala| 13 +
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f939c71b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index d641fca..297ef22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -84,7 +84,7 @@ class TypedColumn[-T, U](
  *   col("`a.column.with.dots`") // Escape `.` in column names.
  *   $"columnName"   // Scala short hand for a named column.
  *   expr("a + 1")   // A column that is constructed from a parsed 
SQL Expression.
- *   lit("1")// A column that produces a literal 
(constant) value.
+ *   lit("abc")  // A column that produces a literal 
(constant) value.
  * }}}
  *
  * [[Column]] objects can be composed to form complex expressions:

http://git-wip-us.apache.org/repos/asf/spark/blob/f939c71b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index eb87003..1acfe84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1414,6 +1414,19 @@ class DataFrame private[sql](
   def first(): Row = head()
 
   /**
+   * Concise syntax for chaining custom transformations.
+   * {{{
+   *   def featurize(ds: DataFrame) = ...
+   *
+   *   df
+   * .transform(featurize)
+   * .transform(...)
+   * }}}
+   * @since 1.6.0
+   */
+  def transform[U](t: DataFrame => DataFrame): DataFrame = t(this)
+
+  /**
* Returns a new RDD by applying a function to all rows of this DataFrame.
* @group rdd
* @since 1.3.0


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12242][SQL] Add DataFrame.transform method

2015-12-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 21b3d2a75 -> 76540b6df


[SPARK-12242][SQL] Add DataFrame.transform method

Author: Reynold Xin 

Closes #10226 from rxin/df-transform.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76540b6d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76540b6d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76540b6d

Branch: refs/heads/master
Commit: 76540b6df5370b463277d3498097b2cc2d2e97a8
Parents: 21b3d2a
Author: Reynold Xin 
Authored: Thu Dec 10 22:23:10 2015 +0800
Committer: Reynold Xin 
Committed: Thu Dec 10 22:23:10 2015 +0800

--
 .../src/main/scala/org/apache/spark/sql/Column.scala   |  2 +-
 .../main/scala/org/apache/spark/sql/DataFrame.scala| 13 +
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/76540b6d/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index d641fca..297ef22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -84,7 +84,7 @@ class TypedColumn[-T, U](
  *   col("`a.column.with.dots`") // Escape `.` in column names.
  *   $"columnName"   // Scala short hand for a named column.
  *   expr("a + 1")   // A column that is constructed from a parsed 
SQL Expression.
- *   lit("1")// A column that produces a literal 
(constant) value.
+ *   lit("abc")  // A column that produces a literal 
(constant) value.
  * }}}
  *
  * [[Column]] objects can be composed to form complex expressions:

http://git-wip-us.apache.org/repos/asf/spark/blob/76540b6d/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 243a8c8..da180a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1422,6 +1422,19 @@ class DataFrame private[sql](
   def first(): Row = head()
 
   /**
+   * Concise syntax for chaining custom transformations.
+   * {{{
+   *   def featurize(ds: DataFrame) = ...
+   *
+   *   df
+   * .transform(featurize)
+   * .transform(...)
+   * }}}
+   * @since 1.6.0
+   */
+  def transform[U](t: DataFrame => DataFrame): DataFrame = t(this)
+
+  /**
* Returns a new RDD by applying a function to all rows of this DataFrame.
* @group rdd
* @since 1.3.0


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11832][CORE] Process arguments in spark-shell for Scala 2.11

2015-12-10 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 76540b6df -> db5165246


[SPARK-11832][CORE] Process arguments in spark-shell for Scala 2.11

Process arguments passed to the spark-shell. Fixes running the spark-shell from 
within a build environment.

Author: Jakob Odersky 

Closes #9824 from jodersky/shell-2.11.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/db516524
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/db516524
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/db516524

Branch: refs/heads/master
Commit: db5165246f2888537dd0f3d4c5a515875c7358ed
Parents: 76540b6
Author: Jakob Odersky 
Authored: Thu Dec 10 08:35:52 2015 -0800
Committer: Marcelo Vanzin 
Committed: Thu Dec 10 08:35:52 2015 -0800

--
 .../main/scala/org/apache/spark/repl/Main.scala | 37 ++--
 .../scala/org/apache/spark/repl/ReplSuite.scala |  3 +-
 2 files changed, 27 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/db516524/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
--
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 627148d..455a6b9 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -31,24 +31,39 @@ object Main extends Logging {
   val tmp = System.getProperty("java.io.tmpdir")
   val rootDir = conf.get("spark.repl.classdir", tmp)
   val outputDir = Utils.createTempDir(rootDir)
-  val s = new Settings()
-  s.processArguments(List("-Yrepl-class-based",
-"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
-"-classpath", getAddedJars.mkString(File.pathSeparator)), true)
   // the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set 
if needed
   lazy val classServer = new HttpServer(conf, outputDir, new 
SecurityManager(conf))
   var sparkContext: SparkContext = _
   var sqlContext: SQLContext = _
   var interp = new SparkILoop // this is a public var because tests reset it.
 
+  private var hasErrors = false
+
+  private def scalaOptionError(msg: String): Unit = {
+hasErrors = true
+Console.err.println(msg)
+  }
+
   def main(args: Array[String]) {
-if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", 
"true")
-// Start the classServer and store its URI in a spark system property
-// (which will be passed to executors so that they can connect to it)
-classServer.start()
-interp.process(s) // Repl starts and goes in loop of R.E.P.L
-classServer.stop()
-Option(sparkContext).map(_.stop)
+   
+val interpArguments = List(
+  "-Yrepl-class-based",
+  "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
+  "-classpath", getAddedJars.mkString(File.pathSeparator)
+) ++ args.toList
+
+val settings = new Settings(scalaOptionError)
+settings.processArguments(interpArguments, true)
+
+if (!hasErrors) {
+  if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", 
"true")
+  // Start the classServer and store its URI in a spark system property
+  // (which will be passed to executors so that they can connect to it)
+  classServer.start()
+  interp.process(settings) // Repl starts and goes in loop of R.E.P.L
+  classServer.stop()
+  Option(sparkContext).map(_.stop)
+}
   }
 
   def getAddedJars: Array[String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/db516524/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index bf89979..63f3688 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -54,8 +54,7 @@ class ReplSuite extends SparkFunSuite {
   new SparkILoop(in, new PrintWriter(out))
 }
 org.apache.spark.repl.Main.interp = interp
-Main.s.processArguments(List("-classpath", classpath), true)
-Main.main(Array()) // call main
+Main.main(Array("-classpath", classpath)) // call main
 org.apache.spark.repl.Main.interp = null
 
 if (oldExecutorClasspath != null) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org