spark git commit: [SPARK-18811] StreamSource resolution should happen in stream execution thread

2016-12-09 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 8bf56cc46 -> b020ce408


[SPARK-18811] StreamSource resolution should happen in stream execution thread

## What changes were proposed in this pull request?

When you start a stream, if we are trying to resolve the source of the stream, 
for example if we need to resolve partition columns, this could take a long 
time. This long execution time should not block the main thread where 
`query.start()` was called on. It should happen in the stream execution thread 
possibly before starting any triggers.

## How was this patch tested?

Unit test added. Made sure test fails with no code changes.

Author: Burak Yavuz 

Closes #16238 from brkyvz/SPARK-18811.

(cherry picked from commit 63c9159870ee274c68e24360594ca01d476b9ace)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b020ce40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b020ce40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b020ce40

Branch: refs/heads/branch-2.1
Commit: b020ce408507d7fd57f6d357054a2b3530a5b95e
Parents: 8bf56cc
Author: Burak Yavuz 
Authored: Fri Dec 9 22:49:51 2016 -0800
Committer: Shixiong Zhu 
Committed: Fri Dec 9 22:50:10 2016 -0800

--
 .../execution/streaming/StreamExecution.scala   | 24 ++-
 .../sql/streaming/StreamingQueryManager.scala   | 14 +
 .../streaming/StreamingQueryManagerSuite.scala  | 28 +
 .../sql/streaming/util/DefaultSource.scala  | 66 
 4 files changed, 116 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 39be222..b52810d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -47,7 +47,7 @@ class StreamExecution(
 override val sparkSession: SparkSession,
 override val name: String,
 checkpointRoot: String,
-val logicalPlan: LogicalPlan,
+analyzedPlan: LogicalPlan,
 val sink: Sink,
 val trigger: Trigger,
 val triggerClock: Clock,
@@ -115,12 +115,26 @@ class StreamExecution(
   private val prettyIdString =
 Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
 
+  override lazy val logicalPlan: LogicalPlan = {
+var nextSourceId = 0L
+analyzedPlan.transform {
+  case StreamingRelation(dataSource, _, output) =>
+// Materialize source to avoid creating it in every batch
+val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+val source = dataSource.createSource(metadataPath)
+nextSourceId += 1
+// We still need to use the previous `output` instead of 
`source.schema` as attributes in
+// "df.logicalPlan" has already used attributes of the previous 
`output`.
+StreamingExecutionRelation(source, output)
+}
+  }
+
   /** All stream sources present in the query plan. */
-  protected val sources =
+  protected lazy val sources =
 logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
 
   /** A list of unique sources in the query plan. */
-  private val uniqueSources = sources.distinct
+  private lazy val uniqueSources = sources.distinct
 
   private val triggerExecutor = trigger match {
 case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
@@ -214,6 +228,10 @@ class StreamExecution(
   // While active, repeatedly attempt to run batches.
   SparkSession.setActiveSession(sparkSession)
 
+  updateStatusMessage("Initializing sources")
+  // force initialization of the logical plan so that the sources can be 
created
+  logicalPlan
+
   triggerExecutor.execute(() => {
 startTrigger()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index c6ab416..52d0791 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 

spark git commit: [SPARK-18811] StreamSource resolution should happen in stream execution thread

2016-12-09 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 3e11d5bfe -> 63c915987


[SPARK-18811] StreamSource resolution should happen in stream execution thread

## What changes were proposed in this pull request?

When you start a stream, if we are trying to resolve the source of the stream, 
for example if we need to resolve partition columns, this could take a long 
time. This long execution time should not block the main thread where 
`query.start()` was called on. It should happen in the stream execution thread 
possibly before starting any triggers.

## How was this patch tested?

Unit test added. Made sure test fails with no code changes.

Author: Burak Yavuz 

Closes #16238 from brkyvz/SPARK-18811.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63c91598
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63c91598
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63c91598

Branch: refs/heads/master
Commit: 63c9159870ee274c68e24360594ca01d476b9ace
Parents: 3e11d5b
Author: Burak Yavuz 
Authored: Fri Dec 9 22:49:51 2016 -0800
Committer: Shixiong Zhu 
Committed: Fri Dec 9 22:49:51 2016 -0800

--
 .../execution/streaming/StreamExecution.scala   | 24 ++-
 .../sql/streaming/StreamingQueryManager.scala   | 14 +
 .../streaming/StreamingQueryManagerSuite.scala  | 28 +
 .../sql/streaming/util/DefaultSource.scala  | 66 
 4 files changed, 116 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 39be222..b52810d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -47,7 +47,7 @@ class StreamExecution(
 override val sparkSession: SparkSession,
 override val name: String,
 checkpointRoot: String,
-val logicalPlan: LogicalPlan,
+analyzedPlan: LogicalPlan,
 val sink: Sink,
 val trigger: Trigger,
 val triggerClock: Clock,
@@ -115,12 +115,26 @@ class StreamExecution(
   private val prettyIdString =
 Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
 
+  override lazy val logicalPlan: LogicalPlan = {
+var nextSourceId = 0L
+analyzedPlan.transform {
+  case StreamingRelation(dataSource, _, output) =>
+// Materialize source to avoid creating it in every batch
+val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+val source = dataSource.createSource(metadataPath)
+nextSourceId += 1
+// We still need to use the previous `output` instead of 
`source.schema` as attributes in
+// "df.logicalPlan" has already used attributes of the previous 
`output`.
+StreamingExecutionRelation(source, output)
+}
+  }
+
   /** All stream sources present in the query plan. */
-  protected val sources =
+  protected lazy val sources =
 logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
 
   /** A list of unique sources in the query plan. */
-  private val uniqueSources = sources.distinct
+  private lazy val uniqueSources = sources.distinct
 
   private val triggerExecutor = trigger match {
 case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
@@ -214,6 +228,10 @@ class StreamExecution(
   // While active, repeatedly attempt to run batches.
   SparkSession.setActiveSession(sparkSession)
 
+  updateStatusMessage("Initializing sources")
+  // force initialization of the logical plan so that the sources can be 
created
+  logicalPlan
+
   triggerExecutor.execute(() => {
 startTrigger()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index c6ab416..52d0791 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -251,23 +251,11 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {
 

spark git commit: [SPARK-18807][SPARKR] Should suppress output print for calls to JVM methods with void return values

2016-12-09 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 e45345d91 -> 8bf56cc46


[SPARK-18807][SPARKR] Should suppress output print for calls to JVM methods 
with void return values

## What changes were proposed in this pull request?

Several SparkR API calling into JVM methods that have void return values are 
getting printed out, especially when running in a REPL or IDE.
example:
```
> setLogLevel("WARN")
NULL
```
We should fix this to make the result more clear.

Also found a small change to return value of dropTempView in 2.1 - adding doc 
and test for it.

## How was this patch tested?

manually - I didn't find a expect_*() method in testthat for this

Author: Felix Cheung 

Closes #16237 from felixcheung/rinvis.

(cherry picked from commit 3e11d5bfef2f05bd6d42c4d6188eae6d63c963ef)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bf56cc4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bf56cc4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bf56cc4

Branch: refs/heads/branch-2.1
Commit: 8bf56cc46b96874565ebd8109f62e69e6c0cf151
Parents: e45345d
Author: Felix Cheung 
Authored: Fri Dec 9 19:06:05 2016 -0800
Committer: Shivaram Venkataraman 
Committed: Fri Dec 9 19:06:28 2016 -0800

--
 R/pkg/R/SQLContext.R  |  7 ---
 R/pkg/R/context.R |  6 +++---
 R/pkg/R/sparkR.R  |  6 +++---
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 14 +++---
 4 files changed, 17 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8bf56cc4/R/pkg/R/SQLContext.R
--
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 38d83c6..6f48cd6 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -634,7 +634,7 @@ tableNames <- function(x, ...) {
 cacheTable.default <- function(tableName) {
   sparkSession <- getSparkSession()
   catalog <- callJMethod(sparkSession, "catalog")
-  callJMethod(catalog, "cacheTable", tableName)
+  invisible(callJMethod(catalog, "cacheTable", tableName))
 }
 
 cacheTable <- function(x, ...) {
@@ -663,7 +663,7 @@ cacheTable <- function(x, ...) {
 uncacheTable.default <- function(tableName) {
   sparkSession <- getSparkSession()
   catalog <- callJMethod(sparkSession, "catalog")
-  callJMethod(catalog, "uncacheTable", tableName)
+  invisible(callJMethod(catalog, "uncacheTable", tableName))
 }
 
 uncacheTable <- function(x, ...) {
@@ -686,7 +686,7 @@ uncacheTable <- function(x, ...) {
 clearCache.default <- function() {
   sparkSession <- getSparkSession()
   catalog <- callJMethod(sparkSession, "catalog")
-  callJMethod(catalog, "clearCache")
+  invisible(callJMethod(catalog, "clearCache"))
 }
 
 clearCache <- function() {
@@ -730,6 +730,7 @@ dropTempTable <- function(x, ...) {
 #' If the view has been cached before, then it will also be uncached.
 #'
 #' @param viewName the name of the view to be dropped.
+#' @return TRUE if the view is dropped successfully, FALSE otherwise.
 #' @rdname dropTempView
 #' @name dropTempView
 #' @export

http://git-wip-us.apache.org/repos/asf/spark/blob/8bf56cc4/R/pkg/R/context.R
--
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 438d77a..1138caf 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -87,8 +87,8 @@ objectFile <- function(sc, path, minPartitions = NULL) {
 #' in the list are split into \code{numSlices} slices and distributed to nodes
 #' in the cluster.
 #'
-#' If size of serialized slices is larger than spark.r.maxAllocationLimit or 
(200MB), the function 
-#' will write it to disk and send the file name to JVM. Also to make sure each 
slice is not 
+#' If size of serialized slices is larger than spark.r.maxAllocationLimit or 
(200MB), the function
+#' will write it to disk and send the file name to JVM. Also to make sure each 
slice is not
 #' larger than that limit, number of slices may be increased.
 #'
 #' @param sc SparkContext to use
@@ -379,5 +379,5 @@ spark.lapply <- function(list, func) {
 #' @note setLogLevel since 2.0.0
 setLogLevel <- function(level) {
   sc <- getSparkContext()
-  callJMethod(sc, "setLogLevel", level)
+  invisible(callJMethod(sc, "setLogLevel", level))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8bf56cc4/R/pkg/R/sparkR.R
--
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 43bff97..c57cc8f 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -427,7 +427,7 @@ sparkR.session <- function(
 #' @method setJobGroup 

spark git commit: [SPARK-18807][SPARKR] Should suppress output print for calls to JVM methods with void return values

2016-12-09 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master d2493a203 -> 3e11d5bfe


[SPARK-18807][SPARKR] Should suppress output print for calls to JVM methods 
with void return values

## What changes were proposed in this pull request?

Several SparkR API calling into JVM methods that have void return values are 
getting printed out, especially when running in a REPL or IDE.
example:
```
> setLogLevel("WARN")
NULL
```
We should fix this to make the result more clear.

Also found a small change to return value of dropTempView in 2.1 - adding doc 
and test for it.

## How was this patch tested?

manually - I didn't find a expect_*() method in testthat for this

Author: Felix Cheung 

Closes #16237 from felixcheung/rinvis.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e11d5bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e11d5bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e11d5bf

Branch: refs/heads/master
Commit: 3e11d5bfef2f05bd6d42c4d6188eae6d63c963ef
Parents: d2493a2
Author: Felix Cheung 
Authored: Fri Dec 9 19:06:05 2016 -0800
Committer: Shivaram Venkataraman 
Committed: Fri Dec 9 19:06:05 2016 -0800

--
 R/pkg/R/SQLContext.R  |  7 ---
 R/pkg/R/context.R |  6 +++---
 R/pkg/R/sparkR.R  |  6 +++---
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 14 +++---
 4 files changed, 17 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3e11d5bf/R/pkg/R/SQLContext.R
--
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 38d83c6..6f48cd6 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -634,7 +634,7 @@ tableNames <- function(x, ...) {
 cacheTable.default <- function(tableName) {
   sparkSession <- getSparkSession()
   catalog <- callJMethod(sparkSession, "catalog")
-  callJMethod(catalog, "cacheTable", tableName)
+  invisible(callJMethod(catalog, "cacheTable", tableName))
 }
 
 cacheTable <- function(x, ...) {
@@ -663,7 +663,7 @@ cacheTable <- function(x, ...) {
 uncacheTable.default <- function(tableName) {
   sparkSession <- getSparkSession()
   catalog <- callJMethod(sparkSession, "catalog")
-  callJMethod(catalog, "uncacheTable", tableName)
+  invisible(callJMethod(catalog, "uncacheTable", tableName))
 }
 
 uncacheTable <- function(x, ...) {
@@ -686,7 +686,7 @@ uncacheTable <- function(x, ...) {
 clearCache.default <- function() {
   sparkSession <- getSparkSession()
   catalog <- callJMethod(sparkSession, "catalog")
-  callJMethod(catalog, "clearCache")
+  invisible(callJMethod(catalog, "clearCache"))
 }
 
 clearCache <- function() {
@@ -730,6 +730,7 @@ dropTempTable <- function(x, ...) {
 #' If the view has been cached before, then it will also be uncached.
 #'
 #' @param viewName the name of the view to be dropped.
+#' @return TRUE if the view is dropped successfully, FALSE otherwise.
 #' @rdname dropTempView
 #' @name dropTempView
 #' @export

http://git-wip-us.apache.org/repos/asf/spark/blob/3e11d5bf/R/pkg/R/context.R
--
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 438d77a..1138caf 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -87,8 +87,8 @@ objectFile <- function(sc, path, minPartitions = NULL) {
 #' in the list are split into \code{numSlices} slices and distributed to nodes
 #' in the cluster.
 #'
-#' If size of serialized slices is larger than spark.r.maxAllocationLimit or 
(200MB), the function 
-#' will write it to disk and send the file name to JVM. Also to make sure each 
slice is not 
+#' If size of serialized slices is larger than spark.r.maxAllocationLimit or 
(200MB), the function
+#' will write it to disk and send the file name to JVM. Also to make sure each 
slice is not
 #' larger than that limit, number of slices may be increased.
 #'
 #' @param sc SparkContext to use
@@ -379,5 +379,5 @@ spark.lapply <- function(list, func) {
 #' @note setLogLevel since 2.0.0
 setLogLevel <- function(level) {
   sc <- getSparkContext()
-  callJMethod(sc, "setLogLevel", level)
+  invisible(callJMethod(sc, "setLogLevel", level))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3e11d5bf/R/pkg/R/sparkR.R
--
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 43bff97..c57cc8f 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -427,7 +427,7 @@ sparkR.session <- function(
 #' @method setJobGroup default
 setJobGroup.default <- function(groupId, description, interruptOnCancel) {
   sc <- getSparkContext()
-  callJMethod(sc, 

spark git commit: [SPARK-18812][MLLIB] explain "Spark ML"

2016-12-09 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 562507ef0 -> e45345d91


[SPARK-18812][MLLIB] explain "Spark ML"

## What changes were proposed in this pull request?

There has been some confusion around "Spark ML" vs. "MLlib". This PR adds some 
FAQ-like entries to the MLlib user guide to explain "Spark ML" and reduce the 
confusion.

I check the [Spark FAQ page](http://spark.apache.org/faq.html), which seems too 
high-level for the content here. So I added it to the MLlib user guide instead.

cc: mateiz

Author: Xiangrui Meng 

Closes #16241 from mengxr/SPARK-18812.

(cherry picked from commit d2493a203e852adf63dde4e1fc993e8d11efec3d)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e45345d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e45345d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e45345d9

Branch: refs/heads/branch-2.1
Commit: e45345d91e333e0b5f9219e857affeda461863c6
Parents: 562507e
Author: Xiangrui Meng 
Authored: Fri Dec 9 17:34:52 2016 -0800
Committer: Xiangrui Meng 
Committed: Fri Dec 9 17:34:58 2016 -0800

--
 docs/ml-guide.md | 12 
 1 file changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e45345d9/docs/ml-guide.md
--
diff --git a/docs/ml-guide.md b/docs/ml-guide.md
index ddf81be..9717619 100644
--- a/docs/ml-guide.md
+++ b/docs/ml-guide.md
@@ -35,6 +35,18 @@ The primary Machine Learning API for Spark is now the 
[DataFrame](sql-programmin
 * The DataFrame-based API for MLlib provides a uniform API across ML 
algorithms and across multiple languages.
 * DataFrames facilitate practical ML Pipelines, particularly feature 
transformations.  See the [Pipelines guide](ml-pipeline.html) for details.
 
+*What is "Spark ML"?*
+
+* "Spark ML" is not an official name but occasionally used to refer to the 
MLlib DataFrame-based API.
+  This is majorly due to the `org.apache.spark.ml` Scala package name used by 
the DataFrame-based API, 
+  and the "Spark ML Pipelines" term we used initially to emphasize the 
pipeline concept.
+  
+*Is MLlib deprecated?*
+
+* No. MLlib includes both the RDD-based API and the DataFrame-based API.
+  The RDD-based API is now in maintenance mode.
+  But neither API is deprecated, nor MLlib as a whole.
+
 # Dependencies
 
 MLlib uses the linear algebra package [Breeze](http://www.scalanlp.org/), 
which depends on


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18812][MLLIB] explain "Spark ML"

2016-12-09 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master cf33a8628 -> d2493a203


[SPARK-18812][MLLIB] explain "Spark ML"

## What changes were proposed in this pull request?

There has been some confusion around "Spark ML" vs. "MLlib". This PR adds some 
FAQ-like entries to the MLlib user guide to explain "Spark ML" and reduce the 
confusion.

I check the [Spark FAQ page](http://spark.apache.org/faq.html), which seems too 
high-level for the content here. So I added it to the MLlib user guide instead.

cc: mateiz

Author: Xiangrui Meng 

Closes #16241 from mengxr/SPARK-18812.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2493a20
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2493a20
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2493a20

Branch: refs/heads/master
Commit: d2493a203e852adf63dde4e1fc993e8d11efec3d
Parents: cf33a86
Author: Xiangrui Meng 
Authored: Fri Dec 9 17:34:52 2016 -0800
Committer: Xiangrui Meng 
Committed: Fri Dec 9 17:34:52 2016 -0800

--
 docs/ml-guide.md | 12 
 1 file changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d2493a20/docs/ml-guide.md
--
diff --git a/docs/ml-guide.md b/docs/ml-guide.md
index ddf81be..9717619 100644
--- a/docs/ml-guide.md
+++ b/docs/ml-guide.md
@@ -35,6 +35,18 @@ The primary Machine Learning API for Spark is now the 
[DataFrame](sql-programmin
 * The DataFrame-based API for MLlib provides a uniform API across ML 
algorithms and across multiple languages.
 * DataFrames facilitate practical ML Pipelines, particularly feature 
transformations.  See the [Pipelines guide](ml-pipeline.html) for details.
 
+*What is "Spark ML"?*
+
+* "Spark ML" is not an official name but occasionally used to refer to the 
MLlib DataFrame-based API.
+  This is majorly due to the `org.apache.spark.ml` Scala package name used by 
the DataFrame-based API, 
+  and the "Spark ML Pipelines" term we used initially to emphasize the 
pipeline concept.
+  
+*Is MLlib deprecated?*
+
+* No. MLlib includes both the RDD-based API and the DataFrame-based API.
+  The RDD-based API is now in maintenance mode.
+  But neither API is deprecated, nor MLlib as a whole.
+
 # Dependencies
 
 MLlib uses the linear algebra package [Breeze](http://www.scalanlp.org/), 
which depends on


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4105] retry the fetch or stage if shuffle block is corrupt

2016-12-09 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master d60ab5fd9 -> cf33a8628


[SPARK-4105] retry the fetch or stage if shuffle block is corrupt

## What changes were proposed in this pull request?

There is an outstanding issue that existed for a long time: Sometimes the 
shuffle blocks are corrupt and can't be decompressed. We recently hit this in 
three different workloads, sometimes we can reproduce it by every try, 
sometimes can't. I also found that when the corruption happened, the beginning 
and end of the blocks are correct, the corruption happen in the middle. There 
was one case that the string of block id is corrupt by one character. It seems 
that it's very likely the corruption is introduced by some weird 
machine/hardware, also the checksum (16 bits) in TCP is not strong enough to 
identify all the corruption.

Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, 
the job will fail if any corruption happen in the shuffle block from disk, or 
broadcast blocks during network. This PR try to detect the corruption after 
fetching shuffle blocks by decompressing them, because most of the compression 
already have checksum in them. It will retry the block, or failed with 
FetchFailure, so the previous stage could be retried on different (still 
random) machines.

Checksum for broadcast will be added by another PR.

## How was this patch tested?

Added unit tests

Author: Davies Liu 

Closes #15923 from davies/detect_corrupt.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf33a862
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf33a862
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf33a862

Branch: refs/heads/master
Commit: cf33a86285629abe72c1acf235b8bfa6057220a8
Parents: d60ab5f
Author: Davies Liu 
Authored: Fri Dec 9 15:44:22 2016 -0800
Committer: Shixiong Zhu 
Committed: Fri Dec 9 15:44:22 2016 -0800

--
 .../spark/shuffle/BlockStoreShuffleReader.scala |  13 +-
 .../storage/ShuffleBlockFetcherIterator.scala   | 133 +-
 .../spark/util/io/ChunkedByteBuffer.scala   |   2 +-
 .../ShuffleBlockFetcherIteratorSuite.scala  | 172 ++-
 4 files changed, 263 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cf33a862/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index b9d8349..8b2e26c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -42,24 +42,21 @@ private[spark] class BlockStoreShuffleReader[K, C](
 
   /** Read the combined key-values for this reduce task */
   override def read(): Iterator[Product2[K, C]] = {
-val blockFetcherItr = new ShuffleBlockFetcherIterator(
+val wrappedStreams = new ShuffleBlockFetcherIterator(
   context,
   blockManager.shuffleClient,
   blockManager,
   mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, 
startPartition, endPartition),
+  serializerManager.wrapStream,
   // Note: we use getSizeAsMb when no suffix is provided for backwards 
compatibility
   SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 
1024 * 1024,
-  SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue))
-
-// Wrap the streams for compression and encryption based on configuration
-val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
-  serializerManager.wrapStream(blockId, inputStream)
-}
+  SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
+  SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
 
 val serializerInstance = dep.serializer.newInstance()
 
 // Create a key/value iterator for each stream
-val recordIter = wrappedStreams.flatMap { wrappedStream =>
+val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
   // Note: the asKeyValueIterator below wraps a key/value iterator inside 
of a
   // NextIterator. The NextIterator makes sure that close() is called on 
the
   // underlying InputStream when all records have been read.

http://git-wip-us.apache.org/repos/asf/spark/blob/cf33a862/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
--
diff --git 

spark git commit: [SPARK-18745][SQL] Fix signed integer overflow due to toInt cast

2016-12-09 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 eb2d9bfd4 -> 562507ef0


[SPARK-18745][SQL] Fix signed integer overflow due to toInt cast

## What changes were proposed in this pull request?

This PR avoids that a result of a cast `toInt` is negative due to signed 
integer overflow (e.g. 0x__1???L.toInt < 0 ). This PR performs 
casts after we can ensure the value is within range of signed integer (the 
result of `max(array.length, ???)` is always integer).

## How was this patch tested?

Manually executed query68 of TPC-DS with 100TB

Author: Kazuaki Ishizaki 

Closes #16235 from kiszk/SPARK-18745.

(cherry picked from commit d60ab5fd9b6af9aa5080a2d13b3589d8b79c5c5c)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/562507ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/562507ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/562507ef

Branch: refs/heads/branch-2.1
Commit: 562507ef038f09ff422e9831416af5119282a9d0
Parents: eb2d9bf
Author: Kazuaki Ishizaki 
Authored: Fri Dec 9 23:13:36 2016 +0100
Committer: Herman van Hovell 
Committed: Fri Dec 9 23:13:50 2016 +0100

--
 .../apache/spark/sql/execution/joins/HashedRelation.scala| 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/562507ef/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 8821c0d..b9f6601 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -670,9 +670,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
 var offset: Long = Platform.LONG_ARRAY_OFFSET
 val end = len * 8L + Platform.LONG_ARRAY_OFFSET
 while (offset < end) {
-  val size = Math.min(buffer.length, (end - offset).toInt)
+  val size = Math.min(buffer.length, end - offset)
   Platform.copyMemory(arr, offset, buffer, Platform.BYTE_ARRAY_OFFSET, 
size)
-  writeBuffer(buffer, 0, size)
+  writeBuffer(buffer, 0, size.toInt)
   offset += size
 }
   }
@@ -710,8 +710,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
 var offset: Long = Platform.LONG_ARRAY_OFFSET
 val end = length * 8L + Platform.LONG_ARRAY_OFFSET
 while (offset < end) {
-  val size = Math.min(buffer.length, (end - offset).toInt)
-  readBuffer(buffer, 0, size)
+  val size = Math.min(buffer.length, end - offset)
+  readBuffer(buffer, 0, size.toInt)
   Platform.copyMemory(buffer, Platform.BYTE_ARRAY_OFFSET, array, offset, 
size)
   offset += size
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18745][SQL] Fix signed integer overflow due to toInt cast

2016-12-09 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 65b4b0561 -> 2c342e5a4


[SPARK-18745][SQL] Fix signed integer overflow due to toInt cast

## What changes were proposed in this pull request?

This PR avoids that a result of a cast `toInt` is negative due to signed 
integer overflow (e.g. 0x__1???L.toInt < 0 ). This PR performs 
casts after we can ensure the value is within range of signed integer (the 
result of `max(array.length, ???)` is always integer).

## How was this patch tested?

Manually executed query68 of TPC-DS with 100TB

Author: Kazuaki Ishizaki 

Closes #16235 from kiszk/SPARK-18745.

(cherry picked from commit d60ab5fd9b6af9aa5080a2d13b3589d8b79c5c5c)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c342e5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c342e5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c342e5a

Branch: refs/heads/branch-2.0
Commit: 2c342e5a4a9c88ed1ffc2ff19dd0b2eb3b6336ac
Parents: 65b4b05
Author: Kazuaki Ishizaki 
Authored: Fri Dec 9 23:13:36 2016 +0100
Committer: Herman van Hovell 
Committed: Fri Dec 9 23:14:04 2016 +0100

--
 .../apache/spark/sql/execution/joins/HashedRelation.scala| 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2c342e5a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 8821c0d..b9f6601 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -670,9 +670,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
 var offset: Long = Platform.LONG_ARRAY_OFFSET
 val end = len * 8L + Platform.LONG_ARRAY_OFFSET
 while (offset < end) {
-  val size = Math.min(buffer.length, (end - offset).toInt)
+  val size = Math.min(buffer.length, end - offset)
   Platform.copyMemory(arr, offset, buffer, Platform.BYTE_ARRAY_OFFSET, 
size)
-  writeBuffer(buffer, 0, size)
+  writeBuffer(buffer, 0, size.toInt)
   offset += size
 }
   }
@@ -710,8 +710,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
 var offset: Long = Platform.LONG_ARRAY_OFFSET
 val end = length * 8L + Platform.LONG_ARRAY_OFFSET
 while (offset < end) {
-  val size = Math.min(buffer.length, (end - offset).toInt)
-  readBuffer(buffer, 0, size)
+  val size = Math.min(buffer.length, end - offset)
+  readBuffer(buffer, 0, size.toInt)
   Platform.copyMemory(buffer, Platform.BYTE_ARRAY_OFFSET, array, offset, 
size)
   offset += size
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18745][SQL] Fix signed integer overflow due to toInt cast

2016-12-09 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master b08b50045 -> d60ab5fd9


[SPARK-18745][SQL] Fix signed integer overflow due to toInt cast

## What changes were proposed in this pull request?

This PR avoids that a result of a cast `toInt` is negative due to signed 
integer overflow (e.g. 0x__1???L.toInt < 0 ). This PR performs 
casts after we can ensure the value is within range of signed integer (the 
result of `max(array.length, ???)` is always integer).

## How was this patch tested?

Manually executed query68 of TPC-DS with 100TB

Author: Kazuaki Ishizaki 

Closes #16235 from kiszk/SPARK-18745.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d60ab5fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d60ab5fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d60ab5fd

Branch: refs/heads/master
Commit: d60ab5fd9b6af9aa5080a2d13b3589d8b79c5c5c
Parents: b08b500
Author: Kazuaki Ishizaki 
Authored: Fri Dec 9 23:13:36 2016 +0100
Committer: Herman van Hovell 
Committed: Fri Dec 9 23:13:36 2016 +0100

--
 .../apache/spark/sql/execution/joins/HashedRelation.scala| 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d60ab5fd/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 8821c0d..b9f6601 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -670,9 +670,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
 var offset: Long = Platform.LONG_ARRAY_OFFSET
 val end = len * 8L + Platform.LONG_ARRAY_OFFSET
 while (offset < end) {
-  val size = Math.min(buffer.length, (end - offset).toInt)
+  val size = Math.min(buffer.length, end - offset)
   Platform.copyMemory(arr, offset, buffer, Platform.BYTE_ARRAY_OFFSET, 
size)
-  writeBuffer(buffer, 0, size)
+  writeBuffer(buffer, 0, size.toInt)
   offset += size
 }
   }
@@ -710,8 +710,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
 var offset: Long = Platform.LONG_ARRAY_OFFSET
 val end = length * 8L + Platform.LONG_ARRAY_OFFSET
 while (offset < end) {
-  val size = Math.min(buffer.length, (end - offset).toInt)
-  readBuffer(buffer, 0, size)
+  val size = Math.min(buffer.length, end - offset)
+  readBuffer(buffer, 0, size.toInt)
   Platform.copyMemory(buffer, Platform.BYTE_ARRAY_OFFSET, array, offset, 
size)
   offset += size
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18620][STREAMING][KINESIS] Flatten input rates in timeline for streaming + kinesis

2016-12-09 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master be5fc6ef7 -> b08b50045


[SPARK-18620][STREAMING][KINESIS] Flatten input rates in timeline for streaming 
+ kinesis

## What changes were proposed in this pull request?
This pr is to make input rates in timeline more flat for spark streaming + 
kinesis.
Since kinesis workers fetch records and push them into block generators in 
bulk, timeline in web UI has many spikes when `maxRates` applied (See a 
Figure.1 below). This fix splits fetched input records into multiple 
`adRecords` calls.

Figure.1 Apply `maxRates=500` in vanilla Spark
https://cloud.githubusercontent.com/assets/692303/20823861/4602f300-b89b-11e6-95f3-164a37061305.png;>

Figure.2 Apply `maxRates=500` in Spark with my patch
https://cloud.githubusercontent.com/assets/692303/20823882/6c46352c-b89b-11e6-81ab-afd8abfe0cfe.png;>

## How was this patch tested?
Add tests to check to split input records into multiple `addRecords` calls.

Author: Takeshi YAMAMURO 

Closes #16114 from maropu/SPARK-18620.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b08b5004
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b08b5004
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b08b5004

Branch: refs/heads/master
Commit: b08b5004563b28d10b07b70946a9f72408ed228a
Parents: be5fc6e
Author: Takeshi YAMAMURO 
Authored: Sat Dec 10 05:32:04 2016 +0800
Committer: Sean Owen 
Committed: Sat Dec 10 05:32:04 2016 +0800

--
 .../spark/streaming/kinesis/KinesisReceiver.scala  |  6 ++
 .../streaming/kinesis/KinesisRecordProcessor.scala | 14 --
 .../streaming/kinesis/KinesisReceiverSuite.scala   | 17 +
 3 files changed, 35 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b08b5004/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
--
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 858368d..393e56a 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -221,6 +221,12 @@ private[kinesis] class KinesisReceiver[T](
 }
   }
 
+  /** Return the current rate limit defined in [[BlockGenerator]]. */
+  private[kinesis] def getCurrentLimit: Int = {
+assert(blockGenerator != null)
+math.min(blockGenerator.getCurrentLimit, Int.MaxValue).toInt
+  }
+
   /** Get the latest sequence number for the given shard that can be 
checkpointed through KCL */
   private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): 
Option[String] = {
 Option(shardIdToLatestStoredSeqNum.get(shardId))

http://git-wip-us.apache.org/repos/asf/spark/blob/b08b5004/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
--
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index a0ccd08..73ccc4a 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -68,8 +68,18 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: 
KinesisReceiver[T], w
   override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
 if (!receiver.isStopped()) {
   try {
-receiver.addRecords(shardId, batch)
-logDebug(s"Stored: Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+// Limit the number of processed records from Kinesis stream. This is 
because the KCL cannot
+// control the number of aggregated records to be fetched even if we 
set `MaxRecords`
+// in `KinesisClientLibConfiguration`. For example, if we set 10 to 
the number of max
+// records in a worker and a producer aggregates two records into one 
message, the worker
+// possibly 20 records every callback function called.
+val maxRecords = receiver.getCurrentLimit
+for (start <- 0 until batch.size by maxRecords) {
+  val miniBatch = batch.subList(start, math.min(start + maxRecords, 
batch.size))
+  receiver.addRecords(shardId, miniBatch)
+  

[2/2] spark-website git commit: html changes for mllib site

2016-12-09 Thread jkbradley
html changes for mllib site


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/a82adf04
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/a82adf04
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/a82adf04

Branch: refs/heads/asf-site
Commit: a82adf043744269bfd81d78f2f04e0307fd4626b
Parents: 057cad1
Author: Joseph K. Bradley 
Authored: Wed Dec 7 12:43:12 2016 -0800
Committer: Joseph K. Bradley 
Committed: Wed Dec 7 12:43:12 2016 -0800

--
 site/mllib/index.html | 30 +-
 1 file changed, 21 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/a82adf04/site/mllib/index.html
--
diff --git a/site/mllib/index.html b/site/mllib/index.html
index 11808fb..e29228b 100644
--- a/site/mllib/index.html
+++ b/site/mllib/index.html
@@ -220,7 +220,7 @@
 data = spark.read.format("libsvm")\
.load("hdfs://...")
 
-model = KMeans(data, k=10)
+model = KMeans(k=10).fit(data)
   
   Calling MLlib in Python
 
@@ -270,25 +270,37 @@
   
 Algorithms
 
-  MLlib contains many algorithms and utilities, including:
+  MLlib contains many algorithms and utilities.
+
+
+  ML algorithms include:
 
 
   Classification: logistic regression, naive Bayes,...
-  Regression: generalized linear regression, isotonic 
regression,...
+  Regression: generalized linear regression, survival 
regression,...
   Decision trees, random forests, and gradient-boosted trees
   Recommendation: alternating least squares (ALS)
   Clustering: K-means, Gaussian mixtures (GMMs),...
   Topic modeling: latent Dirichlet allocation (LDA)
+  Frequent itemsets, association rules, and sequential pattern 
mining
+
+
+  ML workflow utilities include:
+
+
   Feature transformations: standardization, normalization, 
hashing,...
-  Model evaluation and hyper-parameter tuning
   ML Pipeline construction
+  Model evaluation and hyper-parameter tuning
   ML persistence: saving and loading models and Pipelines
-  Survival analysis: accelerated failure time model
-  Frequent itemset and sequential pattern mining: FP-growth, 
association rules, PrefixSpan
-  Distributed linear algebra: singular value decomposition (SVD), 
principal component analysis (PCA),...
+
+
+  Other utilities include:
+
+
+  Distributed linear algebra: SVD, PCA,...
   Statistics: summary statistics, hypothesis testing,...
 
-Refer to the MLlib guide 
for usage examples.
+Refer to the MLlib guide for 
usage examples.
   
 
   
@@ -315,7 +327,7 @@
 
 
   Download Spark. MLlib is included as a 
module.
-  Read the MLlib guide, 
which includes
+  Read the MLlib guide, which 
includes
   various usage examples.
   Learn how to deploy Spark on a cluster
 if you'd like to run in distributed mode. You can also run locally on 
a multicore machine


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark-website git commit: updated MLlib site for 2.1

2016-12-09 Thread jkbradley
Repository: spark-website
Updated Branches:
  refs/heads/asf-site 8d5d77c65 -> a82adf043


updated MLlib site for 2.1


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/057cad18
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/057cad18
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/057cad18

Branch: refs/heads/asf-site
Commit: 057cad18d2bfbc16b11837ca9614927444b7ac47
Parents: 8d5d77c
Author: Joseph K. Bradley 
Authored: Wed Dec 7 12:12:33 2016 -0800
Committer: Joseph K. Bradley 
Committed: Wed Dec 7 12:12:33 2016 -0800

--
 mllib/index.md | 30 +-
 1 file changed, 21 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/057cad18/mllib/index.md
--
diff --git a/mllib/index.md b/mllib/index.md
index 9c43750..bccd603 100644
--- a/mllib/index.md
+++ b/mllib/index.md
@@ -31,7 +31,7 @@ subproject: MLlib
 data = spark.read.format("libsvm")\
.load("hdfs://...")
 
-model = KMeans(data, k=10)
+model = KMeans(k=10).fit(data)
   
   Calling MLlib in Python
 
@@ -81,25 +81,37 @@ subproject: MLlib
   
 Algorithms
 
-  MLlib contains many algorithms and utilities, including:
+  MLlib contains many algorithms and utilities.
+
+
+  ML algorithms include:
 
 
   Classification: logistic regression, naive Bayes,...
-  Regression: generalized linear regression, isotonic 
regression,...
+  Regression: generalized linear regression, survival 
regression,...
   Decision trees, random forests, and gradient-boosted trees
   Recommendation: alternating least squares (ALS)
   Clustering: K-means, Gaussian mixtures (GMMs),...
   Topic modeling: latent Dirichlet allocation (LDA)
+  Frequent itemsets, association rules, and sequential pattern 
mining
+
+
+  ML workflow utilities include:
+
+
   Feature transformations: standardization, normalization, 
hashing,...
-  Model evaluation and hyper-parameter tuning
   ML Pipeline construction
+  Model evaluation and hyper-parameter tuning
   ML persistence: saving and loading models and Pipelines
-  Survival analysis: accelerated failure time model
-  Frequent itemset and sequential pattern mining: FP-growth, 
association rules, PrefixSpan
-  Distributed linear algebra: singular value decomposition (SVD), 
principal component analysis (PCA),...
+
+
+  Other utilities include:
+
+
+  Distributed linear algebra: SVD, PCA,...
   Statistics: summary statistics, hypothesis testing,...
 
-Refer to the MLlib guide for usage 
examples.
+Refer to the MLlib 
guide for usage examples.
   
 
   
@@ -126,7 +138,7 @@ subproject: MLlib
 
 
   Download Spark. MLlib 
is included as a module.
-  Read the MLlib guide, which 
includes
+  Read the MLlib 
guide, which includes
   various usage examples.
   Learn how to deploy Spark on 
a cluster
 if you'd like to run in distributed mode. You can also run locally on 
a multicore machine


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][SPARKR] Fix SparkR regex in copy command

2016-12-09 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master fd48d80a6 -> be5fc6ef7


[MINOR][SPARKR] Fix SparkR regex in copy command

Fix SparkR package copy regex. The existing code leads to
```
Copying release tarballs to 
/home//public_html/spark-nightly/spark-branch-2.1-bin/spark-2.1.1-SNAPSHOT-2016_12_08_22_38-e8f351f-bin
mput: SparkR-*: no files found
```

Author: Shivaram Venkataraman 

Closes #16231 from shivaram/typo-sparkr-build.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be5fc6ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be5fc6ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be5fc6ef

Branch: refs/heads/master
Commit: be5fc6ef72c7eb586b184b0f42ac50ef32843208
Parents: fd48d80
Author: Shivaram Venkataraman 
Authored: Fri Dec 9 10:12:56 2016 -0800
Committer: Shivaram Venkataraman 
Committed: Fri Dec 9 10:12:56 2016 -0800

--
 dev/create-release/release-build.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/be5fc6ef/dev/create-release/release-build.sh
--
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index c0663b8..b08577c 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -252,7 +252,7 @@ if [[ "$1" == "package" ]]; then
   LFTP mkdir -p $dest_dir
   LFTP mput -O $dest_dir 'spark-*'
   LFTP mput -O $dest_dir 'pyspark-*'
-  LFTP mput -O $dest_dir 'SparkR-*'
+  LFTP mput -O $dest_dir 'SparkR_*'
   # Delete /latest directory and rename new upload to /latest
   LFTP "rm -r -f $REMOTE_PARENT_DIR/latest || exit 0"
   LFTP mv $dest_dir "$REMOTE_PARENT_DIR/latest"
@@ -260,7 +260,7 @@ if [[ "$1" == "package" ]]; then
   LFTP mkdir -p $dest_dir
   LFTP mput -O $dest_dir 'spark-*'
   LFTP mput -O $dest_dir 'pyspark-*'
-  LFTP mput -O $dest_dir 'SparkR-*'
+  LFTP mput -O $dest_dir 'SparkR_*'
   exit 0
 fi
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][SPARKR] Fix SparkR regex in copy command

2016-12-09 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 0c6415aec -> eb2d9bfd4


[MINOR][SPARKR] Fix SparkR regex in copy command

Fix SparkR package copy regex. The existing code leads to
```
Copying release tarballs to 
/home//public_html/spark-nightly/spark-branch-2.1-bin/spark-2.1.1-SNAPSHOT-2016_12_08_22_38-e8f351f-bin
mput: SparkR-*: no files found
```

Author: Shivaram Venkataraman 

Closes #16231 from shivaram/typo-sparkr-build.

(cherry picked from commit be5fc6ef72c7eb586b184b0f42ac50ef32843208)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb2d9bfd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb2d9bfd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb2d9bfd

Branch: refs/heads/branch-2.1
Commit: eb2d9bfd4e100789604ca0810929b42694ea7377
Parents: 0c6415a
Author: Shivaram Venkataraman 
Authored: Fri Dec 9 10:12:56 2016 -0800
Committer: Shivaram Venkataraman 
Committed: Fri Dec 9 10:13:05 2016 -0800

--
 dev/create-release/release-build.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eb2d9bfd/dev/create-release/release-build.sh
--
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index c0663b8..b08577c 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -252,7 +252,7 @@ if [[ "$1" == "package" ]]; then
   LFTP mkdir -p $dest_dir
   LFTP mput -O $dest_dir 'spark-*'
   LFTP mput -O $dest_dir 'pyspark-*'
-  LFTP mput -O $dest_dir 'SparkR-*'
+  LFTP mput -O $dest_dir 'SparkR_*'
   # Delete /latest directory and rename new upload to /latest
   LFTP "rm -r -f $REMOTE_PARENT_DIR/latest || exit 0"
   LFTP mv $dest_dir "$REMOTE_PARENT_DIR/latest"
@@ -260,7 +260,7 @@ if [[ "$1" == "package" ]]; then
   LFTP mkdir -p $dest_dir
   LFTP mput -O $dest_dir 'spark-*'
   LFTP mput -O $dest_dir 'pyspark-*'
-  LFTP mput -O $dest_dir 'SparkR-*'
+  LFTP mput -O $dest_dir 'SparkR_*'
   exit 0
 fi
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend

2016-12-09 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 44df6d2ce -> 65b4b0561


[SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend

## What changes were proposed in this pull request?

* This PR changes `JVMObjectTracker` from `object` to `class` and let its 
instance associated with each RBackend. So we can manage the lifecycle of JVM 
objects when there are multiple `RBackend` sessions. `RBackend.close` will 
clear the object tracker explicitly.
* I assume that `SQLUtils` and `RRunner` do not need to track JVM instances, 
which could be wrong.
* Small refactor of `SerDe.sqlSerDe` to increase readability.

## How was this patch tested?

* Added unit tests for `JVMObjectTracker`.
* Wait for Jenkins to run full tests.

Author: Xiangrui Meng 

Closes #16154 from mengxr/SPARK-17822.

(cherry picked from commit fd48d80a6145ea94f03e7fc6e4d724a0fbccac58)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65b4b056
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65b4b056
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65b4b056

Branch: refs/heads/branch-2.0
Commit: 65b4b05616bf8f5cf70a618cc15d379634e9b42d
Parents: 44df6d2
Author: Xiangrui Meng 
Authored: Fri Dec 9 07:51:46 2016 -0800
Committer: Xiangrui Meng 
Committed: Fri Dec 9 07:55:58 2016 -0800

--
 .../apache/spark/api/r/JVMObjectTracker.scala   | 87 ++
 .../scala/org/apache/spark/api/r/RBackend.scala |  4 +
 .../apache/spark/api/r/RBackendHandler.scala| 54 ++--
 .../scala/org/apache/spark/api/r/RRunner.scala  |  2 +-
 .../scala/org/apache/spark/api/r/SerDe.scala| 92 
 .../spark/api/r/JVMObjectTrackerSuite.scala | 73 
 .../org/apache/spark/api/r/RBackendSuite.scala  | 31 +++
 .../org/apache/spark/sql/api/r/SQLUtils.scala   | 12 +--
 8 files changed, 264 insertions(+), 91 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/65b4b056/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala 
b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala
new file mode 100644
index 000..3432700
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.r
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.ConcurrentHashMap
+
+/** JVM object ID wrapper */
+private[r] case class JVMObjectId(id: String) {
+  require(id != null, "Object ID cannot be null.")
+}
+
+/**
+ * Counter that tracks JVM objects returned to R.
+ * This is useful for referencing these objects in RPC calls.
+ */
+private[r] class JVMObjectTracker {
+
+  private[this] val objMap = new ConcurrentHashMap[JVMObjectId, Object]()
+  private[this] val objCounter = new AtomicInteger()
+
+  /**
+   * Returns the JVM object associated with the input key or None if not found.
+   */
+  final def get(id: JVMObjectId): Option[Object] = this.synchronized {
+if (objMap.containsKey(id)) {
+  Some(objMap.get(id))
+} else {
+  None
+}
+  }
+
+  /**
+   * Returns the JVM object associated with the input key or throws an 
exception if not found.
+   */
+  @throws[NoSuchElementException]("if key does not exist.")
+  final def apply(id: JVMObjectId): Object = {
+get(id).getOrElse(
+  throw new NoSuchElementException(s"$id does not exist.")
+)
+  }
+
+  /**
+   * Adds a JVM object to track and returns assigned ID, which is unique 
within this tracker.
+   */
+  final def addAndGetId(obj: Object): JVMObjectId = {
+val id = JVMObjectId(objCounter.getAndIncrement().toString)
+objMap.put(id, obj)
+id
+  }
+
+  /**
+   * Removes and returns a JVM object with the 

spark git commit: [SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend

2016-12-09 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 b226f10e3 -> 0c6415aec


[SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend

## What changes were proposed in this pull request?

* This PR changes `JVMObjectTracker` from `object` to `class` and let its 
instance associated with each RBackend. So we can manage the lifecycle of JVM 
objects when there are multiple `RBackend` sessions. `RBackend.close` will 
clear the object tracker explicitly.
* I assume that `SQLUtils` and `RRunner` do not need to track JVM instances, 
which could be wrong.
* Small refactor of `SerDe.sqlSerDe` to increase readability.

## How was this patch tested?

* Added unit tests for `JVMObjectTracker`.
* Wait for Jenkins to run full tests.

Author: Xiangrui Meng 

Closes #16154 from mengxr/SPARK-17822.

(cherry picked from commit fd48d80a6145ea94f03e7fc6e4d724a0fbccac58)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c6415ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c6415ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c6415ae

Branch: refs/heads/branch-2.1
Commit: 0c6415aeca7a5c2fc5462c483c60d770f0236efe
Parents: b226f10
Author: Xiangrui Meng 
Authored: Fri Dec 9 07:51:46 2016 -0800
Committer: Xiangrui Meng 
Committed: Fri Dec 9 07:51:58 2016 -0800

--
 .../apache/spark/api/r/JVMObjectTracker.scala   | 87 ++
 .../scala/org/apache/spark/api/r/RBackend.scala |  6 +-
 .../apache/spark/api/r/RBackendHandler.scala| 54 ++--
 .../scala/org/apache/spark/api/r/RRunner.scala  |  2 +-
 .../scala/org/apache/spark/api/r/SerDe.scala| 92 
 .../spark/api/r/JVMObjectTrackerSuite.scala | 73 
 .../org/apache/spark/api/r/RBackendSuite.scala  | 31 +++
 .../org/apache/spark/sql/api/r/SQLUtils.scala   | 12 +--
 8 files changed, 265 insertions(+), 92 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c6415ae/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala 
b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala
new file mode 100644
index 000..3432700
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.r
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.ConcurrentHashMap
+
+/** JVM object ID wrapper */
+private[r] case class JVMObjectId(id: String) {
+  require(id != null, "Object ID cannot be null.")
+}
+
+/**
+ * Counter that tracks JVM objects returned to R.
+ * This is useful for referencing these objects in RPC calls.
+ */
+private[r] class JVMObjectTracker {
+
+  private[this] val objMap = new ConcurrentHashMap[JVMObjectId, Object]()
+  private[this] val objCounter = new AtomicInteger()
+
+  /**
+   * Returns the JVM object associated with the input key or None if not found.
+   */
+  final def get(id: JVMObjectId): Option[Object] = this.synchronized {
+if (objMap.containsKey(id)) {
+  Some(objMap.get(id))
+} else {
+  None
+}
+  }
+
+  /**
+   * Returns the JVM object associated with the input key or throws an 
exception if not found.
+   */
+  @throws[NoSuchElementException]("if key does not exist.")
+  final def apply(id: JVMObjectId): Object = {
+get(id).getOrElse(
+  throw new NoSuchElementException(s"$id does not exist.")
+)
+  }
+
+  /**
+   * Adds a JVM object to track and returns assigned ID, which is unique 
within this tracker.
+   */
+  final def addAndGetId(obj: Object): JVMObjectId = {
+val id = JVMObjectId(objCounter.getAndIncrement().toString)
+objMap.put(id, obj)
+id
+  }
+
+  /**
+   * Removes and returns a JVM object with the 

spark git commit: [SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend

2016-12-09 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master b162cc0c2 -> fd48d80a6


[SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend

## What changes were proposed in this pull request?

* This PR changes `JVMObjectTracker` from `object` to `class` and let its 
instance associated with each RBackend. So we can manage the lifecycle of JVM 
objects when there are multiple `RBackend` sessions. `RBackend.close` will 
clear the object tracker explicitly.
* I assume that `SQLUtils` and `RRunner` do not need to track JVM instances, 
which could be wrong.
* Small refactor of `SerDe.sqlSerDe` to increase readability.

## How was this patch tested?

* Added unit tests for `JVMObjectTracker`.
* Wait for Jenkins to run full tests.

Author: Xiangrui Meng 

Closes #16154 from mengxr/SPARK-17822.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd48d80a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd48d80a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd48d80a

Branch: refs/heads/master
Commit: fd48d80a6145ea94f03e7fc6e4d724a0fbccac58
Parents: b162cc0
Author: Xiangrui Meng 
Authored: Fri Dec 9 07:51:46 2016 -0800
Committer: Xiangrui Meng 
Committed: Fri Dec 9 07:51:46 2016 -0800

--
 .../apache/spark/api/r/JVMObjectTracker.scala   | 87 ++
 .../scala/org/apache/spark/api/r/RBackend.scala |  6 +-
 .../apache/spark/api/r/RBackendHandler.scala| 54 ++--
 .../scala/org/apache/spark/api/r/RRunner.scala  |  2 +-
 .../scala/org/apache/spark/api/r/SerDe.scala| 92 
 .../spark/api/r/JVMObjectTrackerSuite.scala | 73 
 .../org/apache/spark/api/r/RBackendSuite.scala  | 31 +++
 .../org/apache/spark/sql/api/r/SQLUtils.scala   | 12 +--
 8 files changed, 265 insertions(+), 92 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fd48d80a/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala 
b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala
new file mode 100644
index 000..3432700
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.r
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.ConcurrentHashMap
+
+/** JVM object ID wrapper */
+private[r] case class JVMObjectId(id: String) {
+  require(id != null, "Object ID cannot be null.")
+}
+
+/**
+ * Counter that tracks JVM objects returned to R.
+ * This is useful for referencing these objects in RPC calls.
+ */
+private[r] class JVMObjectTracker {
+
+  private[this] val objMap = new ConcurrentHashMap[JVMObjectId, Object]()
+  private[this] val objCounter = new AtomicInteger()
+
+  /**
+   * Returns the JVM object associated with the input key or None if not found.
+   */
+  final def get(id: JVMObjectId): Option[Object] = this.synchronized {
+if (objMap.containsKey(id)) {
+  Some(objMap.get(id))
+} else {
+  None
+}
+  }
+
+  /**
+   * Returns the JVM object associated with the input key or throws an 
exception if not found.
+   */
+  @throws[NoSuchElementException]("if key does not exist.")
+  final def apply(id: JVMObjectId): Object = {
+get(id).getOrElse(
+  throw new NoSuchElementException(s"$id does not exist.")
+)
+  }
+
+  /**
+   * Adds a JVM object to track and returns assigned ID, which is unique 
within this tracker.
+   */
+  final def addAndGetId(obj: Object): JVMObjectId = {
+val id = JVMObjectId(objCounter.getAndIncrement().toString)
+objMap.put(id, obj)
+id
+  }
+
+  /**
+   * Removes and returns a JVM object with the specific ID from the tracker, 
or None if not found.
+   */
+  final def remove(id: JVMObjectId): Option[Object] = this.synchronized {

spark git commit: [MINOR][CORE][SQL][DOCS] Typo fixes

2016-12-09 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 72bf51997 -> b226f10e3


[MINOR][CORE][SQL][DOCS] Typo fixes

## What changes were proposed in this pull request?

Typo fixes

## How was this patch tested?

Local build. Awaiting the official build.

Author: Jacek Laskowski 

Closes #16144 from jaceklaskowski/typo-fixes.

(cherry picked from commit b162cc0c2810c1a9fa2eee8e664ffae84f9eea11)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b226f10e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b226f10e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b226f10e

Branch: refs/heads/branch-2.1
Commit: b226f10e3df8b789da6ef820b256f994b178fbbe
Parents: 72bf519
Author: Jacek Laskowski 
Authored: Fri Dec 9 18:45:57 2016 +0800
Committer: Sean Owen 
Committed: Fri Dec 9 18:46:32 2016 +0800

--
 core/src/main/scala/org/apache/spark/MapOutputTracker.scala| 2 +-
 core/src/main/scala/org/apache/spark/SparkContext.scala| 4 ++--
 .../apache/spark/deploy/history/HistoryServerArguments.scala   | 2 +-
 .../main/scala/org/apache/spark/internal/config/package.scala  | 2 +-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +-
 core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala  | 2 +-
 docs/monitoring.md | 6 ++
 .../main/java/org/apache/spark/sql/streaming/OutputMode.java   | 2 +-
 .../main/scala/org/apache/spark/sql/catalyst/InternalRow.scala | 2 +-
 .../apache/spark/sql/catalyst/catalog/ExternalCatalog.scala| 2 +-
 .../org/apache/spark/sql/catalyst/expressions/Expression.scala | 6 +++---
 .../spark/sql/catalyst/expressions/objects/objects.scala   | 2 +-
 12 files changed, 16 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b226f10e/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 7f8f0f5..6f5c31d 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -322,7 +322,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
   if (minSizeForBroadcast > maxRpcMessageSize) {
 val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast 
($minSizeForBroadcast bytes) must " +
   s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent 
sending an rpc " +
-  "message that is to large."
+  "message that is too large."
 logError(msg)
 throw new IllegalArgumentException(msg)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b226f10e/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8f8392f..b6aeeb9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2567,8 +2567,8 @@ object SparkContext extends Logging {
 val serviceLoaders =
   ServiceLoader.load(classOf[ExternalClusterManager], 
loader).asScala.filter(_.canCreate(url))
 if (serviceLoaders.size > 1) {
-  throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) 
registered " +
-  s"for the url $url:")
+  throw new SparkException(
+s"Multiple external cluster managers registered for the url $url: 
$serviceLoaders")
 }
 serviceLoaders.headOption
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b226f10e/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 2eddb5f..080ba12 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
 /**
- * Command-line parser for the master.
+ * Command-line parser for the [[HistoryServer]].
  */
 private[history] class HistoryServerArguments(conf: SparkConf, args: 
Array[String])
   extends Logging {


spark git commit: [MINOR][CORE][SQL][DOCS] Typo fixes

2016-12-09 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 67587d961 -> b162cc0c2


[MINOR][CORE][SQL][DOCS] Typo fixes

## What changes were proposed in this pull request?

Typo fixes

## How was this patch tested?

Local build. Awaiting the official build.

Author: Jacek Laskowski 

Closes #16144 from jaceklaskowski/typo-fixes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b162cc0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b162cc0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b162cc0c

Branch: refs/heads/master
Commit: b162cc0c2810c1a9fa2eee8e664ffae84f9eea11
Parents: 67587d9
Author: Jacek Laskowski 
Authored: Fri Dec 9 18:45:57 2016 +0800
Committer: Sean Owen 
Committed: Fri Dec 9 18:45:57 2016 +0800

--
 core/src/main/scala/org/apache/spark/MapOutputTracker.scala| 2 +-
 core/src/main/scala/org/apache/spark/SparkContext.scala| 4 ++--
 .../apache/spark/deploy/history/HistoryServerArguments.scala   | 2 +-
 .../main/scala/org/apache/spark/internal/config/package.scala  | 2 +-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +-
 core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala  | 2 +-
 docs/monitoring.md | 6 ++
 .../main/java/org/apache/spark/sql/streaming/OutputMode.java   | 2 +-
 .../main/scala/org/apache/spark/sql/catalyst/InternalRow.scala | 2 +-
 .../apache/spark/sql/catalyst/catalog/ExternalCatalog.scala| 2 +-
 .../org/apache/spark/sql/catalyst/expressions/Expression.scala | 6 +++---
 .../spark/sql/catalyst/expressions/objects/objects.scala   | 2 +-
 12 files changed, 16 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b162cc0c/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 7f8f0f5..6f5c31d 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -322,7 +322,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
   if (minSizeForBroadcast > maxRpcMessageSize) {
 val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast 
($minSizeForBroadcast bytes) must " +
   s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent 
sending an rpc " +
-  "message that is to large."
+  "message that is too large."
 logError(msg)
 throw new IllegalArgumentException(msg)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b162cc0c/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b42820a..02c009c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2570,8 +2570,8 @@ object SparkContext extends Logging {
 val serviceLoaders =
   ServiceLoader.load(classOf[ExternalClusterManager], 
loader).asScala.filter(_.canCreate(url))
 if (serviceLoaders.size > 1) {
-  throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) 
registered " +
-  s"for the url $url:")
+  throw new SparkException(
+s"Multiple external cluster managers registered for the url $url: 
$serviceLoaders")
 }
 serviceLoaders.headOption
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b162cc0c/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 2eddb5f..080ba12 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
 /**
- * Command-line parser for the master.
+ * Command-line parser for the [[HistoryServer]].
  */
 private[history] class HistoryServerArguments(conf: SparkConf, args: 
Array[String])
   extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/b162cc0c/core/src/main/scala/org/apache/spark/internal/config/package.scala
--
diff --git 

spark git commit: [SPARK-18637][SQL] Stateful UDF should be considered as nondeterministic

2016-12-09 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 2c88e1dc3 -> 72bf51997


[SPARK-18637][SQL] Stateful UDF should be considered as nondeterministic

Make stateful udf as nondeterministic

Add new test cases with both Stateful and Stateless UDF.
Without the patch, the test cases will throw exception:

1 did not equal 10
ScalaTestFailureLocation: 
org.apache.spark.sql.hive.execution.HiveUDFSuite$$anonfun$21 at 
(HiveUDFSuite.scala:501)
org.scalatest.exceptions.TestFailedException: 1 did not equal 10
at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
at 
org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
...

Author: Zhan Zhang 

Closes #16068 from zhzhan/state.

(cherry picked from commit 67587d961d5f94a8639c20cb80127c86bf79d5a8)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72bf5199
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72bf5199
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72bf5199

Branch: refs/heads/branch-2.1
Commit: 72bf5199738c7ab0361b2b55eb4f4299048a21fa
Parents: 2c88e1d
Author: Zhan Zhang 
Authored: Fri Dec 9 16:35:06 2016 +0800
Committer: Wenchen Fan 
Committed: Fri Dec 9 16:36:35 2016 +0800

--
 .../org/apache/spark/sql/hive/hiveUDFs.scala|  4 +-
 .../spark/sql/hive/execution/HiveUDFSuite.scala | 45 +++-
 2 files changed, 45 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/72bf5199/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
--
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index e30e0f9..37414ad 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -59,7 +59,7 @@ private[hive] case class HiveSimpleUDF(
   @transient
   private lazy val isUDFDeterministic = {
 val udfType = function.getClass().getAnnotation(classOf[HiveUDFType])
-udfType != null && udfType.deterministic()
+udfType != null && udfType.deterministic() && !udfType.stateful()
   }
 
   override def foldable: Boolean = isUDFDeterministic && 
children.forall(_.foldable)
@@ -142,7 +142,7 @@ private[hive] case class HiveGenericUDF(
   @transient
   private lazy val isUDFDeterministic = {
 val udfType = function.getClass.getAnnotation(classOf[HiveUDFType])
-udfType != null && udfType.deterministic()
+udfType != null && udfType.deterministic() && !udfType.stateful()
   }
 
   @transient

http://git-wip-us.apache.org/repos/asf/spark/blob/72bf5199/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 48adc83..4098bb5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -21,15 +21,17 @@ import java.io.{DataInput, DataOutput, File, PrintWriter}
 import java.util.{ArrayList, Arrays, Properties}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hive.ql.udf.UDAFPercentile
+import org.apache.hadoop.hive.ql.exec.UDF
+import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType}
 import org.apache.hadoop.hive.ql.udf.generic._
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject
 import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats}
 import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, 
ObjectInspectorFactory}
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
-import org.apache.hadoop.io.Writable
+import org.apache.hadoop.io.{LongWritable, Writable}
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.functions.max
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.util.Utils
@@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) {
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 

spark git commit: [SPARK-18637][SQL] Stateful UDF should be considered as nondeterministic

2016-12-09 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master c074c96dc -> 67587d961


[SPARK-18637][SQL] Stateful UDF should be considered as nondeterministic

## What changes were proposed in this pull request?

Make stateful udf as nondeterministic

## How was this patch tested?
Add new test cases with both Stateful and Stateless UDF.
Without the patch, the test cases will throw exception:

1 did not equal 10
ScalaTestFailureLocation: 
org.apache.spark.sql.hive.execution.HiveUDFSuite$$anonfun$21 at 
(HiveUDFSuite.scala:501)
org.scalatest.exceptions.TestFailedException: 1 did not equal 10
at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
at 
org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
...

Author: Zhan Zhang 

Closes #16068 from zhzhan/state.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67587d96
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67587d96
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67587d96

Branch: refs/heads/master
Commit: 67587d961d5f94a8639c20cb80127c86bf79d5a8
Parents: c074c96
Author: Zhan Zhang 
Authored: Fri Dec 9 16:35:06 2016 +0800
Committer: Wenchen Fan 
Committed: Fri Dec 9 16:35:06 2016 +0800

--
 .../org/apache/spark/sql/hive/hiveUDFs.scala|  4 +-
 .../spark/sql/hive/execution/HiveUDFSuite.scala | 45 +++-
 2 files changed, 45 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/67587d96/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
--
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 349faae..26dc372 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -61,7 +61,7 @@ private[hive] case class HiveSimpleUDF(
   @transient
   private lazy val isUDFDeterministic = {
 val udfType = function.getClass.getAnnotation(classOf[HiveUDFType])
-udfType != null && udfType.deterministic()
+udfType != null && udfType.deterministic() && !udfType.stateful()
   }
 
   override def foldable: Boolean = isUDFDeterministic && 
children.forall(_.foldable)
@@ -144,7 +144,7 @@ private[hive] case class HiveGenericUDF(
   @transient
   private lazy val isUDFDeterministic = {
 val udfType = function.getClass.getAnnotation(classOf[HiveUDFType])
-udfType != null && udfType.deterministic()
+udfType != null && udfType.deterministic() && !udfType.stateful()
   }
 
   @transient

http://git-wip-us.apache.org/repos/asf/spark/blob/67587d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 48adc83..4098bb5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -21,15 +21,17 @@ import java.io.{DataInput, DataOutput, File, PrintWriter}
 import java.util.{ArrayList, Arrays, Properties}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hive.ql.udf.UDAFPercentile
+import org.apache.hadoop.hive.ql.exec.UDF
+import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType}
 import org.apache.hadoop.hive.ql.udf.generic._
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject
 import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats}
 import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, 
ObjectInspectorFactory}
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
-import org.apache.hadoop.io.Writable
+import org.apache.hadoop.io.{LongWritable, Writable}
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.functions.max
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.util.Utils
@@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) {
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY