spark git commit: [SPARK-18811] StreamSource resolution should happen in stream execution thread
Repository: spark Updated Branches: refs/heads/branch-2.1 8bf56cc46 -> b020ce408 [SPARK-18811] StreamSource resolution should happen in stream execution thread ## What changes were proposed in this pull request? When you start a stream, if we are trying to resolve the source of the stream, for example if we need to resolve partition columns, this could take a long time. This long execution time should not block the main thread where `query.start()` was called on. It should happen in the stream execution thread possibly before starting any triggers. ## How was this patch tested? Unit test added. Made sure test fails with no code changes. Author: Burak YavuzCloses #16238 from brkyvz/SPARK-18811. (cherry picked from commit 63c9159870ee274c68e24360594ca01d476b9ace) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b020ce40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b020ce40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b020ce40 Branch: refs/heads/branch-2.1 Commit: b020ce408507d7fd57f6d357054a2b3530a5b95e Parents: 8bf56cc Author: Burak Yavuz Authored: Fri Dec 9 22:49:51 2016 -0800 Committer: Shixiong Zhu Committed: Fri Dec 9 22:50:10 2016 -0800 -- .../execution/streaming/StreamExecution.scala | 24 ++- .../sql/streaming/StreamingQueryManager.scala | 14 + .../streaming/StreamingQueryManagerSuite.scala | 28 + .../sql/streaming/util/DefaultSource.scala | 66 4 files changed, 116 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 39be222..b52810d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -47,7 +47,7 @@ class StreamExecution( override val sparkSession: SparkSession, override val name: String, checkpointRoot: String, -val logicalPlan: LogicalPlan, +analyzedPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, val triggerClock: Clock, @@ -115,12 +115,26 @@ class StreamExecution( private val prettyIdString = Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]" + override lazy val logicalPlan: LogicalPlan = { +var nextSourceId = 0L +analyzedPlan.transform { + case StreamingRelation(dataSource, _, output) => +// Materialize source to avoid creating it in every batch +val metadataPath = s"$checkpointRoot/sources/$nextSourceId" +val source = dataSource.createSource(metadataPath) +nextSourceId += 1 +// We still need to use the previous `output` instead of `source.schema` as attributes in +// "df.logicalPlan" has already used attributes of the previous `output`. +StreamingExecutionRelation(source, output) +} + } + /** All stream sources present in the query plan. */ - protected val sources = + protected lazy val sources = logicalPlan.collect { case s: StreamingExecutionRelation => s.source } /** A list of unique sources in the query plan. */ - private val uniqueSources = sources.distinct + private lazy val uniqueSources = sources.distinct private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) @@ -214,6 +228,10 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SparkSession.setActiveSession(sparkSession) + updateStatusMessage("Initializing sources") + // force initialization of the logical plan so that the sources can be created + logicalPlan + triggerExecutor.execute(() => { startTrigger() http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index c6ab416..52d0791 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++
spark git commit: [SPARK-18811] StreamSource resolution should happen in stream execution thread
Repository: spark Updated Branches: refs/heads/master 3e11d5bfe -> 63c915987 [SPARK-18811] StreamSource resolution should happen in stream execution thread ## What changes were proposed in this pull request? When you start a stream, if we are trying to resolve the source of the stream, for example if we need to resolve partition columns, this could take a long time. This long execution time should not block the main thread where `query.start()` was called on. It should happen in the stream execution thread possibly before starting any triggers. ## How was this patch tested? Unit test added. Made sure test fails with no code changes. Author: Burak YavuzCloses #16238 from brkyvz/SPARK-18811. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63c91598 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63c91598 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63c91598 Branch: refs/heads/master Commit: 63c9159870ee274c68e24360594ca01d476b9ace Parents: 3e11d5b Author: Burak Yavuz Authored: Fri Dec 9 22:49:51 2016 -0800 Committer: Shixiong Zhu Committed: Fri Dec 9 22:49:51 2016 -0800 -- .../execution/streaming/StreamExecution.scala | 24 ++- .../sql/streaming/StreamingQueryManager.scala | 14 + .../streaming/StreamingQueryManagerSuite.scala | 28 + .../sql/streaming/util/DefaultSource.scala | 66 4 files changed, 116 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 39be222..b52810d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -47,7 +47,7 @@ class StreamExecution( override val sparkSession: SparkSession, override val name: String, checkpointRoot: String, -val logicalPlan: LogicalPlan, +analyzedPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, val triggerClock: Clock, @@ -115,12 +115,26 @@ class StreamExecution( private val prettyIdString = Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]" + override lazy val logicalPlan: LogicalPlan = { +var nextSourceId = 0L +analyzedPlan.transform { + case StreamingRelation(dataSource, _, output) => +// Materialize source to avoid creating it in every batch +val metadataPath = s"$checkpointRoot/sources/$nextSourceId" +val source = dataSource.createSource(metadataPath) +nextSourceId += 1 +// We still need to use the previous `output` instead of `source.schema` as attributes in +// "df.logicalPlan" has already used attributes of the previous `output`. +StreamingExecutionRelation(source, output) +} + } + /** All stream sources present in the query plan. */ - protected val sources = + protected lazy val sources = logicalPlan.collect { case s: StreamingExecutionRelation => s.source } /** A list of unique sources in the query plan. */ - private val uniqueSources = sources.distinct + private lazy val uniqueSources = sources.distinct private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) @@ -214,6 +228,10 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SparkSession.setActiveSession(sparkSession) + updateStatusMessage("Initializing sources") + // force initialization of the logical plan so that the sources can be created + logicalPlan + triggerExecutor.execute(() => { startTrigger() http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index c6ab416..52d0791 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -251,23 +251,11 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
spark git commit: [SPARK-18807][SPARKR] Should suppress output print for calls to JVM methods with void return values
Repository: spark Updated Branches: refs/heads/branch-2.1 e45345d91 -> 8bf56cc46 [SPARK-18807][SPARKR] Should suppress output print for calls to JVM methods with void return values ## What changes were proposed in this pull request? Several SparkR API calling into JVM methods that have void return values are getting printed out, especially when running in a REPL or IDE. example: ``` > setLogLevel("WARN") NULL ``` We should fix this to make the result more clear. Also found a small change to return value of dropTempView in 2.1 - adding doc and test for it. ## How was this patch tested? manually - I didn't find a expect_*() method in testthat for this Author: Felix CheungCloses #16237 from felixcheung/rinvis. (cherry picked from commit 3e11d5bfef2f05bd6d42c4d6188eae6d63c963ef) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bf56cc4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bf56cc4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bf56cc4 Branch: refs/heads/branch-2.1 Commit: 8bf56cc46b96874565ebd8109f62e69e6c0cf151 Parents: e45345d Author: Felix Cheung Authored: Fri Dec 9 19:06:05 2016 -0800 Committer: Shivaram Venkataraman Committed: Fri Dec 9 19:06:28 2016 -0800 -- R/pkg/R/SQLContext.R | 7 --- R/pkg/R/context.R | 6 +++--- R/pkg/R/sparkR.R | 6 +++--- R/pkg/inst/tests/testthat/test_sparkSQL.R | 14 +++--- 4 files changed, 17 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8bf56cc4/R/pkg/R/SQLContext.R -- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 38d83c6..6f48cd6 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -634,7 +634,7 @@ tableNames <- function(x, ...) { cacheTable.default <- function(tableName) { sparkSession <- getSparkSession() catalog <- callJMethod(sparkSession, "catalog") - callJMethod(catalog, "cacheTable", tableName) + invisible(callJMethod(catalog, "cacheTable", tableName)) } cacheTable <- function(x, ...) { @@ -663,7 +663,7 @@ cacheTable <- function(x, ...) { uncacheTable.default <- function(tableName) { sparkSession <- getSparkSession() catalog <- callJMethod(sparkSession, "catalog") - callJMethod(catalog, "uncacheTable", tableName) + invisible(callJMethod(catalog, "uncacheTable", tableName)) } uncacheTable <- function(x, ...) { @@ -686,7 +686,7 @@ uncacheTable <- function(x, ...) { clearCache.default <- function() { sparkSession <- getSparkSession() catalog <- callJMethod(sparkSession, "catalog") - callJMethod(catalog, "clearCache") + invisible(callJMethod(catalog, "clearCache")) } clearCache <- function() { @@ -730,6 +730,7 @@ dropTempTable <- function(x, ...) { #' If the view has been cached before, then it will also be uncached. #' #' @param viewName the name of the view to be dropped. +#' @return TRUE if the view is dropped successfully, FALSE otherwise. #' @rdname dropTempView #' @name dropTempView #' @export http://git-wip-us.apache.org/repos/asf/spark/blob/8bf56cc4/R/pkg/R/context.R -- diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 438d77a..1138caf 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -87,8 +87,8 @@ objectFile <- function(sc, path, minPartitions = NULL) { #' in the list are split into \code{numSlices} slices and distributed to nodes #' in the cluster. #' -#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MB), the function -#' will write it to disk and send the file name to JVM. Also to make sure each slice is not +#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MB), the function +#' will write it to disk and send the file name to JVM. Also to make sure each slice is not #' larger than that limit, number of slices may be increased. #' #' @param sc SparkContext to use @@ -379,5 +379,5 @@ spark.lapply <- function(list, func) { #' @note setLogLevel since 2.0.0 setLogLevel <- function(level) { sc <- getSparkContext() - callJMethod(sc, "setLogLevel", level) + invisible(callJMethod(sc, "setLogLevel", level)) } http://git-wip-us.apache.org/repos/asf/spark/blob/8bf56cc4/R/pkg/R/sparkR.R -- diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 43bff97..c57cc8f 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -427,7 +427,7 @@ sparkR.session <- function( #' @method setJobGroup
spark git commit: [SPARK-18807][SPARKR] Should suppress output print for calls to JVM methods with void return values
Repository: spark Updated Branches: refs/heads/master d2493a203 -> 3e11d5bfe [SPARK-18807][SPARKR] Should suppress output print for calls to JVM methods with void return values ## What changes were proposed in this pull request? Several SparkR API calling into JVM methods that have void return values are getting printed out, especially when running in a REPL or IDE. example: ``` > setLogLevel("WARN") NULL ``` We should fix this to make the result more clear. Also found a small change to return value of dropTempView in 2.1 - adding doc and test for it. ## How was this patch tested? manually - I didn't find a expect_*() method in testthat for this Author: Felix CheungCloses #16237 from felixcheung/rinvis. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e11d5bf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e11d5bf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e11d5bf Branch: refs/heads/master Commit: 3e11d5bfef2f05bd6d42c4d6188eae6d63c963ef Parents: d2493a2 Author: Felix Cheung Authored: Fri Dec 9 19:06:05 2016 -0800 Committer: Shivaram Venkataraman Committed: Fri Dec 9 19:06:05 2016 -0800 -- R/pkg/R/SQLContext.R | 7 --- R/pkg/R/context.R | 6 +++--- R/pkg/R/sparkR.R | 6 +++--- R/pkg/inst/tests/testthat/test_sparkSQL.R | 14 +++--- 4 files changed, 17 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3e11d5bf/R/pkg/R/SQLContext.R -- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 38d83c6..6f48cd6 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -634,7 +634,7 @@ tableNames <- function(x, ...) { cacheTable.default <- function(tableName) { sparkSession <- getSparkSession() catalog <- callJMethod(sparkSession, "catalog") - callJMethod(catalog, "cacheTable", tableName) + invisible(callJMethod(catalog, "cacheTable", tableName)) } cacheTable <- function(x, ...) { @@ -663,7 +663,7 @@ cacheTable <- function(x, ...) { uncacheTable.default <- function(tableName) { sparkSession <- getSparkSession() catalog <- callJMethod(sparkSession, "catalog") - callJMethod(catalog, "uncacheTable", tableName) + invisible(callJMethod(catalog, "uncacheTable", tableName)) } uncacheTable <- function(x, ...) { @@ -686,7 +686,7 @@ uncacheTable <- function(x, ...) { clearCache.default <- function() { sparkSession <- getSparkSession() catalog <- callJMethod(sparkSession, "catalog") - callJMethod(catalog, "clearCache") + invisible(callJMethod(catalog, "clearCache")) } clearCache <- function() { @@ -730,6 +730,7 @@ dropTempTable <- function(x, ...) { #' If the view has been cached before, then it will also be uncached. #' #' @param viewName the name of the view to be dropped. +#' @return TRUE if the view is dropped successfully, FALSE otherwise. #' @rdname dropTempView #' @name dropTempView #' @export http://git-wip-us.apache.org/repos/asf/spark/blob/3e11d5bf/R/pkg/R/context.R -- diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 438d77a..1138caf 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -87,8 +87,8 @@ objectFile <- function(sc, path, minPartitions = NULL) { #' in the list are split into \code{numSlices} slices and distributed to nodes #' in the cluster. #' -#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MB), the function -#' will write it to disk and send the file name to JVM. Also to make sure each slice is not +#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MB), the function +#' will write it to disk and send the file name to JVM. Also to make sure each slice is not #' larger than that limit, number of slices may be increased. #' #' @param sc SparkContext to use @@ -379,5 +379,5 @@ spark.lapply <- function(list, func) { #' @note setLogLevel since 2.0.0 setLogLevel <- function(level) { sc <- getSparkContext() - callJMethod(sc, "setLogLevel", level) + invisible(callJMethod(sc, "setLogLevel", level)) } http://git-wip-us.apache.org/repos/asf/spark/blob/3e11d5bf/R/pkg/R/sparkR.R -- diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 43bff97..c57cc8f 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -427,7 +427,7 @@ sparkR.session <- function( #' @method setJobGroup default setJobGroup.default <- function(groupId, description, interruptOnCancel) { sc <- getSparkContext() - callJMethod(sc,
spark git commit: [SPARK-18812][MLLIB] explain "Spark ML"
Repository: spark Updated Branches: refs/heads/branch-2.1 562507ef0 -> e45345d91 [SPARK-18812][MLLIB] explain "Spark ML" ## What changes were proposed in this pull request? There has been some confusion around "Spark ML" vs. "MLlib". This PR adds some FAQ-like entries to the MLlib user guide to explain "Spark ML" and reduce the confusion. I check the [Spark FAQ page](http://spark.apache.org/faq.html), which seems too high-level for the content here. So I added it to the MLlib user guide instead. cc: mateiz Author: Xiangrui MengCloses #16241 from mengxr/SPARK-18812. (cherry picked from commit d2493a203e852adf63dde4e1fc993e8d11efec3d) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e45345d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e45345d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e45345d9 Branch: refs/heads/branch-2.1 Commit: e45345d91e333e0b5f9219e857affeda461863c6 Parents: 562507e Author: Xiangrui Meng Authored: Fri Dec 9 17:34:52 2016 -0800 Committer: Xiangrui Meng Committed: Fri Dec 9 17:34:58 2016 -0800 -- docs/ml-guide.md | 12 1 file changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e45345d9/docs/ml-guide.md -- diff --git a/docs/ml-guide.md b/docs/ml-guide.md index ddf81be..9717619 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -35,6 +35,18 @@ The primary Machine Learning API for Spark is now the [DataFrame](sql-programmin * The DataFrame-based API for MLlib provides a uniform API across ML algorithms and across multiple languages. * DataFrames facilitate practical ML Pipelines, particularly feature transformations. See the [Pipelines guide](ml-pipeline.html) for details. +*What is "Spark ML"?* + +* "Spark ML" is not an official name but occasionally used to refer to the MLlib DataFrame-based API. + This is majorly due to the `org.apache.spark.ml` Scala package name used by the DataFrame-based API, + and the "Spark ML Pipelines" term we used initially to emphasize the pipeline concept. + +*Is MLlib deprecated?* + +* No. MLlib includes both the RDD-based API and the DataFrame-based API. + The RDD-based API is now in maintenance mode. + But neither API is deprecated, nor MLlib as a whole. + # Dependencies MLlib uses the linear algebra package [Breeze](http://www.scalanlp.org/), which depends on - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18812][MLLIB] explain "Spark ML"
Repository: spark Updated Branches: refs/heads/master cf33a8628 -> d2493a203 [SPARK-18812][MLLIB] explain "Spark ML" ## What changes were proposed in this pull request? There has been some confusion around "Spark ML" vs. "MLlib". This PR adds some FAQ-like entries to the MLlib user guide to explain "Spark ML" and reduce the confusion. I check the [Spark FAQ page](http://spark.apache.org/faq.html), which seems too high-level for the content here. So I added it to the MLlib user guide instead. cc: mateiz Author: Xiangrui MengCloses #16241 from mengxr/SPARK-18812. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2493a20 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2493a20 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2493a20 Branch: refs/heads/master Commit: d2493a203e852adf63dde4e1fc993e8d11efec3d Parents: cf33a86 Author: Xiangrui Meng Authored: Fri Dec 9 17:34:52 2016 -0800 Committer: Xiangrui Meng Committed: Fri Dec 9 17:34:52 2016 -0800 -- docs/ml-guide.md | 12 1 file changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d2493a20/docs/ml-guide.md -- diff --git a/docs/ml-guide.md b/docs/ml-guide.md index ddf81be..9717619 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -35,6 +35,18 @@ The primary Machine Learning API for Spark is now the [DataFrame](sql-programmin * The DataFrame-based API for MLlib provides a uniform API across ML algorithms and across multiple languages. * DataFrames facilitate practical ML Pipelines, particularly feature transformations. See the [Pipelines guide](ml-pipeline.html) for details. +*What is "Spark ML"?* + +* "Spark ML" is not an official name but occasionally used to refer to the MLlib DataFrame-based API. + This is majorly due to the `org.apache.spark.ml` Scala package name used by the DataFrame-based API, + and the "Spark ML Pipelines" term we used initially to emphasize the pipeline concept. + +*Is MLlib deprecated?* + +* No. MLlib includes both the RDD-based API and the DataFrame-based API. + The RDD-based API is now in maintenance mode. + But neither API is deprecated, nor MLlib as a whole. + # Dependencies MLlib uses the linear algebra package [Breeze](http://www.scalanlp.org/), which depends on - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4105] retry the fetch or stage if shuffle block is corrupt
Repository: spark Updated Branches: refs/heads/master d60ab5fd9 -> cf33a8628 [SPARK-4105] retry the fetch or stage if shuffle block is corrupt ## What changes were proposed in this pull request? There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption. Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines. Checksum for broadcast will be added by another PR. ## How was this patch tested? Added unit tests Author: Davies LiuCloses #15923 from davies/detect_corrupt. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf33a862 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf33a862 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf33a862 Branch: refs/heads/master Commit: cf33a86285629abe72c1acf235b8bfa6057220a8 Parents: d60ab5f Author: Davies Liu Authored: Fri Dec 9 15:44:22 2016 -0800 Committer: Shixiong Zhu Committed: Fri Dec 9 15:44:22 2016 -0800 -- .../spark/shuffle/BlockStoreShuffleReader.scala | 13 +- .../storage/ShuffleBlockFetcherIterator.scala | 133 +- .../spark/util/io/ChunkedByteBuffer.scala | 2 +- .../ShuffleBlockFetcherIteratorSuite.scala | 172 ++- 4 files changed, 263 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cf33a862/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index b9d8349..8b2e26c 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -42,24 +42,21 @@ private[spark] class BlockStoreShuffleReader[K, C]( /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { -val blockFetcherItr = new ShuffleBlockFetcherIterator( +val wrappedStreams = new ShuffleBlockFetcherIterator( context, blockManager.shuffleClient, blockManager, mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), + serializerManager.wrapStream, // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, - SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue)) - -// Wrap the streams for compression and encryption based on configuration -val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => - serializerManager.wrapStream(blockId, inputStream) -} + SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue), + SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true)) val serializerInstance = dep.serializer.newInstance() // Create a key/value iterator for each stream -val recordIter = wrappedStreams.flatMap { wrappedStream => +val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) => // Note: the asKeyValueIterator below wraps a key/value iterator inside of a // NextIterator. The NextIterator makes sure that close() is called on the // underlying InputStream when all records have been read. http://git-wip-us.apache.org/repos/asf/spark/blob/cf33a862/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala -- diff --git
spark git commit: [SPARK-18745][SQL] Fix signed integer overflow due to toInt cast
Repository: spark Updated Branches: refs/heads/branch-2.1 eb2d9bfd4 -> 562507ef0 [SPARK-18745][SQL] Fix signed integer overflow due to toInt cast ## What changes were proposed in this pull request? This PR avoids that a result of a cast `toInt` is negative due to signed integer overflow (e.g. 0x__1???L.toInt < 0 ). This PR performs casts after we can ensure the value is within range of signed integer (the result of `max(array.length, ???)` is always integer). ## How was this patch tested? Manually executed query68 of TPC-DS with 100TB Author: Kazuaki IshizakiCloses #16235 from kiszk/SPARK-18745. (cherry picked from commit d60ab5fd9b6af9aa5080a2d13b3589d8b79c5c5c) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/562507ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/562507ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/562507ef Branch: refs/heads/branch-2.1 Commit: 562507ef038f09ff422e9831416af5119282a9d0 Parents: eb2d9bf Author: Kazuaki Ishizaki Authored: Fri Dec 9 23:13:36 2016 +0100 Committer: Herman van Hovell Committed: Fri Dec 9 23:13:50 2016 +0100 -- .../apache/spark/sql/execution/joins/HashedRelation.scala| 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/562507ef/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 8821c0d..b9f6601 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -670,9 +670,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap var offset: Long = Platform.LONG_ARRAY_OFFSET val end = len * 8L + Platform.LONG_ARRAY_OFFSET while (offset < end) { - val size = Math.min(buffer.length, (end - offset).toInt) + val size = Math.min(buffer.length, end - offset) Platform.copyMemory(arr, offset, buffer, Platform.BYTE_ARRAY_OFFSET, size) - writeBuffer(buffer, 0, size) + writeBuffer(buffer, 0, size.toInt) offset += size } } @@ -710,8 +710,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap var offset: Long = Platform.LONG_ARRAY_OFFSET val end = length * 8L + Platform.LONG_ARRAY_OFFSET while (offset < end) { - val size = Math.min(buffer.length, (end - offset).toInt) - readBuffer(buffer, 0, size) + val size = Math.min(buffer.length, end - offset) + readBuffer(buffer, 0, size.toInt) Platform.copyMemory(buffer, Platform.BYTE_ARRAY_OFFSET, array, offset, size) offset += size } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18745][SQL] Fix signed integer overflow due to toInt cast
Repository: spark Updated Branches: refs/heads/branch-2.0 65b4b0561 -> 2c342e5a4 [SPARK-18745][SQL] Fix signed integer overflow due to toInt cast ## What changes were proposed in this pull request? This PR avoids that a result of a cast `toInt` is negative due to signed integer overflow (e.g. 0x__1???L.toInt < 0 ). This PR performs casts after we can ensure the value is within range of signed integer (the result of `max(array.length, ???)` is always integer). ## How was this patch tested? Manually executed query68 of TPC-DS with 100TB Author: Kazuaki IshizakiCloses #16235 from kiszk/SPARK-18745. (cherry picked from commit d60ab5fd9b6af9aa5080a2d13b3589d8b79c5c5c) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c342e5a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c342e5a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c342e5a Branch: refs/heads/branch-2.0 Commit: 2c342e5a4a9c88ed1ffc2ff19dd0b2eb3b6336ac Parents: 65b4b05 Author: Kazuaki Ishizaki Authored: Fri Dec 9 23:13:36 2016 +0100 Committer: Herman van Hovell Committed: Fri Dec 9 23:14:04 2016 +0100 -- .../apache/spark/sql/execution/joins/HashedRelation.scala| 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2c342e5a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 8821c0d..b9f6601 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -670,9 +670,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap var offset: Long = Platform.LONG_ARRAY_OFFSET val end = len * 8L + Platform.LONG_ARRAY_OFFSET while (offset < end) { - val size = Math.min(buffer.length, (end - offset).toInt) + val size = Math.min(buffer.length, end - offset) Platform.copyMemory(arr, offset, buffer, Platform.BYTE_ARRAY_OFFSET, size) - writeBuffer(buffer, 0, size) + writeBuffer(buffer, 0, size.toInt) offset += size } } @@ -710,8 +710,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap var offset: Long = Platform.LONG_ARRAY_OFFSET val end = length * 8L + Platform.LONG_ARRAY_OFFSET while (offset < end) { - val size = Math.min(buffer.length, (end - offset).toInt) - readBuffer(buffer, 0, size) + val size = Math.min(buffer.length, end - offset) + readBuffer(buffer, 0, size.toInt) Platform.copyMemory(buffer, Platform.BYTE_ARRAY_OFFSET, array, offset, size) offset += size } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18745][SQL] Fix signed integer overflow due to toInt cast
Repository: spark Updated Branches: refs/heads/master b08b50045 -> d60ab5fd9 [SPARK-18745][SQL] Fix signed integer overflow due to toInt cast ## What changes were proposed in this pull request? This PR avoids that a result of a cast `toInt` is negative due to signed integer overflow (e.g. 0x__1???L.toInt < 0 ). This PR performs casts after we can ensure the value is within range of signed integer (the result of `max(array.length, ???)` is always integer). ## How was this patch tested? Manually executed query68 of TPC-DS with 100TB Author: Kazuaki IshizakiCloses #16235 from kiszk/SPARK-18745. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d60ab5fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d60ab5fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d60ab5fd Branch: refs/heads/master Commit: d60ab5fd9b6af9aa5080a2d13b3589d8b79c5c5c Parents: b08b500 Author: Kazuaki Ishizaki Authored: Fri Dec 9 23:13:36 2016 +0100 Committer: Herman van Hovell Committed: Fri Dec 9 23:13:36 2016 +0100 -- .../apache/spark/sql/execution/joins/HashedRelation.scala| 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d60ab5fd/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 8821c0d..b9f6601 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -670,9 +670,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap var offset: Long = Platform.LONG_ARRAY_OFFSET val end = len * 8L + Platform.LONG_ARRAY_OFFSET while (offset < end) { - val size = Math.min(buffer.length, (end - offset).toInt) + val size = Math.min(buffer.length, end - offset) Platform.copyMemory(arr, offset, buffer, Platform.BYTE_ARRAY_OFFSET, size) - writeBuffer(buffer, 0, size) + writeBuffer(buffer, 0, size.toInt) offset += size } } @@ -710,8 +710,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap var offset: Long = Platform.LONG_ARRAY_OFFSET val end = length * 8L + Platform.LONG_ARRAY_OFFSET while (offset < end) { - val size = Math.min(buffer.length, (end - offset).toInt) - readBuffer(buffer, 0, size) + val size = Math.min(buffer.length, end - offset) + readBuffer(buffer, 0, size.toInt) Platform.copyMemory(buffer, Platform.BYTE_ARRAY_OFFSET, array, offset, size) offset += size } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18620][STREAMING][KINESIS] Flatten input rates in timeline for streaming + kinesis
Repository: spark Updated Branches: refs/heads/master be5fc6ef7 -> b08b50045 [SPARK-18620][STREAMING][KINESIS] Flatten input rates in timeline for streaming + kinesis ## What changes were proposed in this pull request? This pr is to make input rates in timeline more flat for spark streaming + kinesis. Since kinesis workers fetch records and push them into block generators in bulk, timeline in web UI has many spikes when `maxRates` applied (See a Figure.1 below). This fix splits fetched input records into multiple `adRecords` calls. Figure.1 Apply `maxRates=500` in vanilla Spark https://cloud.githubusercontent.com/assets/692303/20823861/4602f300-b89b-11e6-95f3-164a37061305.png;> Figure.2 Apply `maxRates=500` in Spark with my patch https://cloud.githubusercontent.com/assets/692303/20823882/6c46352c-b89b-11e6-81ab-afd8abfe0cfe.png;> ## How was this patch tested? Add tests to check to split input records into multiple `addRecords` calls. Author: Takeshi YAMAMUROCloses #16114 from maropu/SPARK-18620. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b08b5004 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b08b5004 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b08b5004 Branch: refs/heads/master Commit: b08b5004563b28d10b07b70946a9f72408ed228a Parents: be5fc6e Author: Takeshi YAMAMURO Authored: Sat Dec 10 05:32:04 2016 +0800 Committer: Sean Owen Committed: Sat Dec 10 05:32:04 2016 +0800 -- .../spark/streaming/kinesis/KinesisReceiver.scala | 6 ++ .../streaming/kinesis/KinesisRecordProcessor.scala | 14 -- .../streaming/kinesis/KinesisReceiverSuite.scala | 17 + 3 files changed, 35 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b08b5004/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala -- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 858368d..393e56a 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -221,6 +221,12 @@ private[kinesis] class KinesisReceiver[T]( } } + /** Return the current rate limit defined in [[BlockGenerator]]. */ + private[kinesis] def getCurrentLimit: Int = { +assert(blockGenerator != null) +math.min(blockGenerator.getCurrentLimit, Int.MaxValue).toInt + } + /** Get the latest sequence number for the given shard that can be checkpointed through KCL */ private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = { Option(shardIdToLatestStoredSeqNum.get(shardId)) http://git-wip-us.apache.org/repos/asf/spark/blob/b08b5004/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala -- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index a0ccd08..73ccc4a 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -68,8 +68,18 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) { if (!receiver.isStopped()) { try { -receiver.addRecords(shardId, batch) -logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") +// Limit the number of processed records from Kinesis stream. This is because the KCL cannot +// control the number of aggregated records to be fetched even if we set `MaxRecords` +// in `KinesisClientLibConfiguration`. For example, if we set 10 to the number of max +// records in a worker and a producer aggregates two records into one message, the worker +// possibly 20 records every callback function called. +val maxRecords = receiver.getCurrentLimit +for (start <- 0 until batch.size by maxRecords) { + val miniBatch = batch.subList(start, math.min(start + maxRecords, batch.size)) + receiver.addRecords(shardId, miniBatch) +
[2/2] spark-website git commit: html changes for mllib site
html changes for mllib site Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/a82adf04 Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/a82adf04 Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/a82adf04 Branch: refs/heads/asf-site Commit: a82adf043744269bfd81d78f2f04e0307fd4626b Parents: 057cad1 Author: Joseph K. BradleyAuthored: Wed Dec 7 12:43:12 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Dec 7 12:43:12 2016 -0800 -- site/mllib/index.html | 30 +- 1 file changed, 21 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/a82adf04/site/mllib/index.html -- diff --git a/site/mllib/index.html b/site/mllib/index.html index 11808fb..e29228b 100644 --- a/site/mllib/index.html +++ b/site/mllib/index.html @@ -220,7 +220,7 @@ data = spark.read.format("libsvm")\ .load("hdfs://...") -model = KMeans(data, k=10) +model = KMeans(k=10).fit(data) Calling MLlib in Python @@ -270,25 +270,37 @@ Algorithms - MLlib contains many algorithms and utilities, including: + MLlib contains many algorithms and utilities. + + + ML algorithms include: Classification: logistic regression, naive Bayes,... - Regression: generalized linear regression, isotonic regression,... + Regression: generalized linear regression, survival regression,... Decision trees, random forests, and gradient-boosted trees Recommendation: alternating least squares (ALS) Clustering: K-means, Gaussian mixtures (GMMs),... Topic modeling: latent Dirichlet allocation (LDA) + Frequent itemsets, association rules, and sequential pattern mining + + + ML workflow utilities include: + + Feature transformations: standardization, normalization, hashing,... - Model evaluation and hyper-parameter tuning ML Pipeline construction + Model evaluation and hyper-parameter tuning ML persistence: saving and loading models and Pipelines - Survival analysis: accelerated failure time model - Frequent itemset and sequential pattern mining: FP-growth, association rules, PrefixSpan - Distributed linear algebra: singular value decomposition (SVD), principal component analysis (PCA),... + + + Other utilities include: + + + Distributed linear algebra: SVD, PCA,... Statistics: summary statistics, hypothesis testing,... -Refer to the MLlib guide for usage examples. +Refer to the MLlib guide for usage examples. @@ -315,7 +327,7 @@ Download Spark. MLlib is included as a module. - Read the MLlib guide, which includes + Read the MLlib guide, which includes various usage examples. Learn how to deploy Spark on a cluster if you'd like to run in distributed mode. You can also run locally on a multicore machine - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark-website git commit: updated MLlib site for 2.1
Repository: spark-website Updated Branches: refs/heads/asf-site 8d5d77c65 -> a82adf043 updated MLlib site for 2.1 Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/057cad18 Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/057cad18 Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/057cad18 Branch: refs/heads/asf-site Commit: 057cad18d2bfbc16b11837ca9614927444b7ac47 Parents: 8d5d77c Author: Joseph K. BradleyAuthored: Wed Dec 7 12:12:33 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Dec 7 12:12:33 2016 -0800 -- mllib/index.md | 30 +- 1 file changed, 21 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/057cad18/mllib/index.md -- diff --git a/mllib/index.md b/mllib/index.md index 9c43750..bccd603 100644 --- a/mllib/index.md +++ b/mllib/index.md @@ -31,7 +31,7 @@ subproject: MLlib data = spark.read.format("libsvm")\ .load("hdfs://...") -model = KMeans(data, k=10) +model = KMeans(k=10).fit(data) Calling MLlib in Python @@ -81,25 +81,37 @@ subproject: MLlib Algorithms - MLlib contains many algorithms and utilities, including: + MLlib contains many algorithms and utilities. + + + ML algorithms include: Classification: logistic regression, naive Bayes,... - Regression: generalized linear regression, isotonic regression,... + Regression: generalized linear regression, survival regression,... Decision trees, random forests, and gradient-boosted trees Recommendation: alternating least squares (ALS) Clustering: K-means, Gaussian mixtures (GMMs),... Topic modeling: latent Dirichlet allocation (LDA) + Frequent itemsets, association rules, and sequential pattern mining + + + ML workflow utilities include: + + Feature transformations: standardization, normalization, hashing,... - Model evaluation and hyper-parameter tuning ML Pipeline construction + Model evaluation and hyper-parameter tuning ML persistence: saving and loading models and Pipelines - Survival analysis: accelerated failure time model - Frequent itemset and sequential pattern mining: FP-growth, association rules, PrefixSpan - Distributed linear algebra: singular value decomposition (SVD), principal component analysis (PCA),... + + + Other utilities include: + + + Distributed linear algebra: SVD, PCA,... Statistics: summary statistics, hypothesis testing,... -Refer to the MLlib guide for usage examples. +Refer to the MLlib guide for usage examples. @@ -126,7 +138,7 @@ subproject: MLlib Download Spark. MLlib is included as a module. - Read the MLlib guide, which includes + Read the MLlib guide, which includes various usage examples. Learn how to deploy Spark on a cluster if you'd like to run in distributed mode. You can also run locally on a multicore machine - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][SPARKR] Fix SparkR regex in copy command
Repository: spark Updated Branches: refs/heads/master fd48d80a6 -> be5fc6ef7 [MINOR][SPARKR] Fix SparkR regex in copy command Fix SparkR package copy regex. The existing code leads to ``` Copying release tarballs to /home//public_html/spark-nightly/spark-branch-2.1-bin/spark-2.1.1-SNAPSHOT-2016_12_08_22_38-e8f351f-bin mput: SparkR-*: no files found ``` Author: Shivaram VenkataramanCloses #16231 from shivaram/typo-sparkr-build. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be5fc6ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be5fc6ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be5fc6ef Branch: refs/heads/master Commit: be5fc6ef72c7eb586b184b0f42ac50ef32843208 Parents: fd48d80 Author: Shivaram Venkataraman Authored: Fri Dec 9 10:12:56 2016 -0800 Committer: Shivaram Venkataraman Committed: Fri Dec 9 10:12:56 2016 -0800 -- dev/create-release/release-build.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/be5fc6ef/dev/create-release/release-build.sh -- diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index c0663b8..b08577c 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -252,7 +252,7 @@ if [[ "$1" == "package" ]]; then LFTP mkdir -p $dest_dir LFTP mput -O $dest_dir 'spark-*' LFTP mput -O $dest_dir 'pyspark-*' - LFTP mput -O $dest_dir 'SparkR-*' + LFTP mput -O $dest_dir 'SparkR_*' # Delete /latest directory and rename new upload to /latest LFTP "rm -r -f $REMOTE_PARENT_DIR/latest || exit 0" LFTP mv $dest_dir "$REMOTE_PARENT_DIR/latest" @@ -260,7 +260,7 @@ if [[ "$1" == "package" ]]; then LFTP mkdir -p $dest_dir LFTP mput -O $dest_dir 'spark-*' LFTP mput -O $dest_dir 'pyspark-*' - LFTP mput -O $dest_dir 'SparkR-*' + LFTP mput -O $dest_dir 'SparkR_*' exit 0 fi - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][SPARKR] Fix SparkR regex in copy command
Repository: spark Updated Branches: refs/heads/branch-2.1 0c6415aec -> eb2d9bfd4 [MINOR][SPARKR] Fix SparkR regex in copy command Fix SparkR package copy regex. The existing code leads to ``` Copying release tarballs to /home//public_html/spark-nightly/spark-branch-2.1-bin/spark-2.1.1-SNAPSHOT-2016_12_08_22_38-e8f351f-bin mput: SparkR-*: no files found ``` Author: Shivaram VenkataramanCloses #16231 from shivaram/typo-sparkr-build. (cherry picked from commit be5fc6ef72c7eb586b184b0f42ac50ef32843208) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb2d9bfd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb2d9bfd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb2d9bfd Branch: refs/heads/branch-2.1 Commit: eb2d9bfd4e100789604ca0810929b42694ea7377 Parents: 0c6415a Author: Shivaram Venkataraman Authored: Fri Dec 9 10:12:56 2016 -0800 Committer: Shivaram Venkataraman Committed: Fri Dec 9 10:13:05 2016 -0800 -- dev/create-release/release-build.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eb2d9bfd/dev/create-release/release-build.sh -- diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index c0663b8..b08577c 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -252,7 +252,7 @@ if [[ "$1" == "package" ]]; then LFTP mkdir -p $dest_dir LFTP mput -O $dest_dir 'spark-*' LFTP mput -O $dest_dir 'pyspark-*' - LFTP mput -O $dest_dir 'SparkR-*' + LFTP mput -O $dest_dir 'SparkR_*' # Delete /latest directory and rename new upload to /latest LFTP "rm -r -f $REMOTE_PARENT_DIR/latest || exit 0" LFTP mv $dest_dir "$REMOTE_PARENT_DIR/latest" @@ -260,7 +260,7 @@ if [[ "$1" == "package" ]]; then LFTP mkdir -p $dest_dir LFTP mput -O $dest_dir 'spark-*' LFTP mput -O $dest_dir 'pyspark-*' - LFTP mput -O $dest_dir 'SparkR-*' + LFTP mput -O $dest_dir 'SparkR_*' exit 0 fi - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend
Repository: spark Updated Branches: refs/heads/branch-2.0 44df6d2ce -> 65b4b0561 [SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend ## What changes were proposed in this pull request? * This PR changes `JVMObjectTracker` from `object` to `class` and let its instance associated with each RBackend. So we can manage the lifecycle of JVM objects when there are multiple `RBackend` sessions. `RBackend.close` will clear the object tracker explicitly. * I assume that `SQLUtils` and `RRunner` do not need to track JVM instances, which could be wrong. * Small refactor of `SerDe.sqlSerDe` to increase readability. ## How was this patch tested? * Added unit tests for `JVMObjectTracker`. * Wait for Jenkins to run full tests. Author: Xiangrui MengCloses #16154 from mengxr/SPARK-17822. (cherry picked from commit fd48d80a6145ea94f03e7fc6e4d724a0fbccac58) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65b4b056 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65b4b056 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65b4b056 Branch: refs/heads/branch-2.0 Commit: 65b4b05616bf8f5cf70a618cc15d379634e9b42d Parents: 44df6d2 Author: Xiangrui Meng Authored: Fri Dec 9 07:51:46 2016 -0800 Committer: Xiangrui Meng Committed: Fri Dec 9 07:55:58 2016 -0800 -- .../apache/spark/api/r/JVMObjectTracker.scala | 87 ++ .../scala/org/apache/spark/api/r/RBackend.scala | 4 + .../apache/spark/api/r/RBackendHandler.scala| 54 ++-- .../scala/org/apache/spark/api/r/RRunner.scala | 2 +- .../scala/org/apache/spark/api/r/SerDe.scala| 92 .../spark/api/r/JVMObjectTrackerSuite.scala | 73 .../org/apache/spark/api/r/RBackendSuite.scala | 31 +++ .../org/apache/spark/sql/api/r/SQLUtils.scala | 12 +-- 8 files changed, 264 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/65b4b056/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala new file mode 100644 index 000..3432700 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.r + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.ConcurrentHashMap + +/** JVM object ID wrapper */ +private[r] case class JVMObjectId(id: String) { + require(id != null, "Object ID cannot be null.") +} + +/** + * Counter that tracks JVM objects returned to R. + * This is useful for referencing these objects in RPC calls. + */ +private[r] class JVMObjectTracker { + + private[this] val objMap = new ConcurrentHashMap[JVMObjectId, Object]() + private[this] val objCounter = new AtomicInteger() + + /** + * Returns the JVM object associated with the input key or None if not found. + */ + final def get(id: JVMObjectId): Option[Object] = this.synchronized { +if (objMap.containsKey(id)) { + Some(objMap.get(id)) +} else { + None +} + } + + /** + * Returns the JVM object associated with the input key or throws an exception if not found. + */ + @throws[NoSuchElementException]("if key does not exist.") + final def apply(id: JVMObjectId): Object = { +get(id).getOrElse( + throw new NoSuchElementException(s"$id does not exist.") +) + } + + /** + * Adds a JVM object to track and returns assigned ID, which is unique within this tracker. + */ + final def addAndGetId(obj: Object): JVMObjectId = { +val id = JVMObjectId(objCounter.getAndIncrement().toString) +objMap.put(id, obj) +id + } + + /** + * Removes and returns a JVM object with the
spark git commit: [SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend
Repository: spark Updated Branches: refs/heads/branch-2.1 b226f10e3 -> 0c6415aec [SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend ## What changes were proposed in this pull request? * This PR changes `JVMObjectTracker` from `object` to `class` and let its instance associated with each RBackend. So we can manage the lifecycle of JVM objects when there are multiple `RBackend` sessions. `RBackend.close` will clear the object tracker explicitly. * I assume that `SQLUtils` and `RRunner` do not need to track JVM instances, which could be wrong. * Small refactor of `SerDe.sqlSerDe` to increase readability. ## How was this patch tested? * Added unit tests for `JVMObjectTracker`. * Wait for Jenkins to run full tests. Author: Xiangrui MengCloses #16154 from mengxr/SPARK-17822. (cherry picked from commit fd48d80a6145ea94f03e7fc6e4d724a0fbccac58) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c6415ae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c6415ae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c6415ae Branch: refs/heads/branch-2.1 Commit: 0c6415aeca7a5c2fc5462c483c60d770f0236efe Parents: b226f10 Author: Xiangrui Meng Authored: Fri Dec 9 07:51:46 2016 -0800 Committer: Xiangrui Meng Committed: Fri Dec 9 07:51:58 2016 -0800 -- .../apache/spark/api/r/JVMObjectTracker.scala | 87 ++ .../scala/org/apache/spark/api/r/RBackend.scala | 6 +- .../apache/spark/api/r/RBackendHandler.scala| 54 ++-- .../scala/org/apache/spark/api/r/RRunner.scala | 2 +- .../scala/org/apache/spark/api/r/SerDe.scala| 92 .../spark/api/r/JVMObjectTrackerSuite.scala | 73 .../org/apache/spark/api/r/RBackendSuite.scala | 31 +++ .../org/apache/spark/sql/api/r/SQLUtils.scala | 12 +-- 8 files changed, 265 insertions(+), 92 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c6415ae/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala new file mode 100644 index 000..3432700 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.r + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.ConcurrentHashMap + +/** JVM object ID wrapper */ +private[r] case class JVMObjectId(id: String) { + require(id != null, "Object ID cannot be null.") +} + +/** + * Counter that tracks JVM objects returned to R. + * This is useful for referencing these objects in RPC calls. + */ +private[r] class JVMObjectTracker { + + private[this] val objMap = new ConcurrentHashMap[JVMObjectId, Object]() + private[this] val objCounter = new AtomicInteger() + + /** + * Returns the JVM object associated with the input key or None if not found. + */ + final def get(id: JVMObjectId): Option[Object] = this.synchronized { +if (objMap.containsKey(id)) { + Some(objMap.get(id)) +} else { + None +} + } + + /** + * Returns the JVM object associated with the input key or throws an exception if not found. + */ + @throws[NoSuchElementException]("if key does not exist.") + final def apply(id: JVMObjectId): Object = { +get(id).getOrElse( + throw new NoSuchElementException(s"$id does not exist.") +) + } + + /** + * Adds a JVM object to track and returns assigned ID, which is unique within this tracker. + */ + final def addAndGetId(obj: Object): JVMObjectId = { +val id = JVMObjectId(objCounter.getAndIncrement().toString) +objMap.put(id, obj) +id + } + + /** + * Removes and returns a JVM object with the
spark git commit: [SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend
Repository: spark Updated Branches: refs/heads/master b162cc0c2 -> fd48d80a6 [SPARK-17822][R] Make JVMObjectTracker a member variable of RBackend ## What changes were proposed in this pull request? * This PR changes `JVMObjectTracker` from `object` to `class` and let its instance associated with each RBackend. So we can manage the lifecycle of JVM objects when there are multiple `RBackend` sessions. `RBackend.close` will clear the object tracker explicitly. * I assume that `SQLUtils` and `RRunner` do not need to track JVM instances, which could be wrong. * Small refactor of `SerDe.sqlSerDe` to increase readability. ## How was this patch tested? * Added unit tests for `JVMObjectTracker`. * Wait for Jenkins to run full tests. Author: Xiangrui MengCloses #16154 from mengxr/SPARK-17822. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd48d80a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd48d80a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd48d80a Branch: refs/heads/master Commit: fd48d80a6145ea94f03e7fc6e4d724a0fbccac58 Parents: b162cc0 Author: Xiangrui Meng Authored: Fri Dec 9 07:51:46 2016 -0800 Committer: Xiangrui Meng Committed: Fri Dec 9 07:51:46 2016 -0800 -- .../apache/spark/api/r/JVMObjectTracker.scala | 87 ++ .../scala/org/apache/spark/api/r/RBackend.scala | 6 +- .../apache/spark/api/r/RBackendHandler.scala| 54 ++-- .../scala/org/apache/spark/api/r/RRunner.scala | 2 +- .../scala/org/apache/spark/api/r/SerDe.scala| 92 .../spark/api/r/JVMObjectTrackerSuite.scala | 73 .../org/apache/spark/api/r/RBackendSuite.scala | 31 +++ .../org/apache/spark/sql/api/r/SQLUtils.scala | 12 +-- 8 files changed, 265 insertions(+), 92 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fd48d80a/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala new file mode 100644 index 000..3432700 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/r/JVMObjectTracker.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.r + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.ConcurrentHashMap + +/** JVM object ID wrapper */ +private[r] case class JVMObjectId(id: String) { + require(id != null, "Object ID cannot be null.") +} + +/** + * Counter that tracks JVM objects returned to R. + * This is useful for referencing these objects in RPC calls. + */ +private[r] class JVMObjectTracker { + + private[this] val objMap = new ConcurrentHashMap[JVMObjectId, Object]() + private[this] val objCounter = new AtomicInteger() + + /** + * Returns the JVM object associated with the input key or None if not found. + */ + final def get(id: JVMObjectId): Option[Object] = this.synchronized { +if (objMap.containsKey(id)) { + Some(objMap.get(id)) +} else { + None +} + } + + /** + * Returns the JVM object associated with the input key or throws an exception if not found. + */ + @throws[NoSuchElementException]("if key does not exist.") + final def apply(id: JVMObjectId): Object = { +get(id).getOrElse( + throw new NoSuchElementException(s"$id does not exist.") +) + } + + /** + * Adds a JVM object to track and returns assigned ID, which is unique within this tracker. + */ + final def addAndGetId(obj: Object): JVMObjectId = { +val id = JVMObjectId(objCounter.getAndIncrement().toString) +objMap.put(id, obj) +id + } + + /** + * Removes and returns a JVM object with the specific ID from the tracker, or None if not found. + */ + final def remove(id: JVMObjectId): Option[Object] = this.synchronized {
spark git commit: [MINOR][CORE][SQL][DOCS] Typo fixes
Repository: spark Updated Branches: refs/heads/branch-2.1 72bf51997 -> b226f10e3 [MINOR][CORE][SQL][DOCS] Typo fixes ## What changes were proposed in this pull request? Typo fixes ## How was this patch tested? Local build. Awaiting the official build. Author: Jacek LaskowskiCloses #16144 from jaceklaskowski/typo-fixes. (cherry picked from commit b162cc0c2810c1a9fa2eee8e664ffae84f9eea11) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b226f10e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b226f10e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b226f10e Branch: refs/heads/branch-2.1 Commit: b226f10e3df8b789da6ef820b256f994b178fbbe Parents: 72bf519 Author: Jacek Laskowski Authored: Fri Dec 9 18:45:57 2016 +0800 Committer: Sean Owen Committed: Fri Dec 9 18:46:32 2016 +0800 -- core/src/main/scala/org/apache/spark/MapOutputTracker.scala| 2 +- core/src/main/scala/org/apache/spark/SparkContext.scala| 4 ++-- .../apache/spark/deploy/history/HistoryServerArguments.scala | 2 +- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala | 2 +- docs/monitoring.md | 6 ++ .../main/java/org/apache/spark/sql/streaming/OutputMode.java | 2 +- .../main/scala/org/apache/spark/sql/catalyst/InternalRow.scala | 2 +- .../apache/spark/sql/catalyst/catalog/ExternalCatalog.scala| 2 +- .../org/apache/spark/sql/catalyst/expressions/Expression.scala | 6 +++--- .../spark/sql/catalyst/expressions/objects/objects.scala | 2 +- 12 files changed, 16 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b226f10e/core/src/main/scala/org/apache/spark/MapOutputTracker.scala -- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 7f8f0f5..6f5c31d 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -322,7 +322,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, if (minSizeForBroadcast > maxRpcMessageSize) { val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " + s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an rpc " + - "message that is to large." + "message that is too large." logError(msg) throw new IllegalArgumentException(msg) } http://git-wip-us.apache.org/repos/asf/spark/blob/b226f10e/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8f8392f..b6aeeb9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2567,8 +2567,8 @@ object SparkContext extends Logging { val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) { - throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " + - s"for the url $url:") + throw new SparkException( +s"Multiple external cluster managers registered for the url $url: $serviceLoaders") } serviceLoaders.headOption } http://git-wip-us.apache.org/repos/asf/spark/blob/b226f10e/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index 2eddb5f..080ba12 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils /** - * Command-line parser for the master. + * Command-line parser for the [[HistoryServer]]. */ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[String]) extends Logging {
spark git commit: [MINOR][CORE][SQL][DOCS] Typo fixes
Repository: spark Updated Branches: refs/heads/master 67587d961 -> b162cc0c2 [MINOR][CORE][SQL][DOCS] Typo fixes ## What changes were proposed in this pull request? Typo fixes ## How was this patch tested? Local build. Awaiting the official build. Author: Jacek LaskowskiCloses #16144 from jaceklaskowski/typo-fixes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b162cc0c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b162cc0c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b162cc0c Branch: refs/heads/master Commit: b162cc0c2810c1a9fa2eee8e664ffae84f9eea11 Parents: 67587d9 Author: Jacek Laskowski Authored: Fri Dec 9 18:45:57 2016 +0800 Committer: Sean Owen Committed: Fri Dec 9 18:45:57 2016 +0800 -- core/src/main/scala/org/apache/spark/MapOutputTracker.scala| 2 +- core/src/main/scala/org/apache/spark/SparkContext.scala| 4 ++-- .../apache/spark/deploy/history/HistoryServerArguments.scala | 2 +- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala | 2 +- docs/monitoring.md | 6 ++ .../main/java/org/apache/spark/sql/streaming/OutputMode.java | 2 +- .../main/scala/org/apache/spark/sql/catalyst/InternalRow.scala | 2 +- .../apache/spark/sql/catalyst/catalog/ExternalCatalog.scala| 2 +- .../org/apache/spark/sql/catalyst/expressions/Expression.scala | 6 +++--- .../spark/sql/catalyst/expressions/objects/objects.scala | 2 +- 12 files changed, 16 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b162cc0c/core/src/main/scala/org/apache/spark/MapOutputTracker.scala -- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 7f8f0f5..6f5c31d 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -322,7 +322,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, if (minSizeForBroadcast > maxRpcMessageSize) { val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " + s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an rpc " + - "message that is to large." + "message that is too large." logError(msg) throw new IllegalArgumentException(msg) } http://git-wip-us.apache.org/repos/asf/spark/blob/b162cc0c/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b42820a..02c009c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2570,8 +2570,8 @@ object SparkContext extends Logging { val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) { - throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " + - s"for the url $url:") + throw new SparkException( +s"Multiple external cluster managers registered for the url $url: $serviceLoaders") } serviceLoaders.headOption } http://git-wip-us.apache.org/repos/asf/spark/blob/b162cc0c/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index 2eddb5f..080ba12 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils /** - * Command-line parser for the master. + * Command-line parser for the [[HistoryServer]]. */ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[String]) extends Logging { http://git-wip-us.apache.org/repos/asf/spark/blob/b162cc0c/core/src/main/scala/org/apache/spark/internal/config/package.scala -- diff --git
spark git commit: [SPARK-18637][SQL] Stateful UDF should be considered as nondeterministic
Repository: spark Updated Branches: refs/heads/branch-2.1 2c88e1dc3 -> 72bf51997 [SPARK-18637][SQL] Stateful UDF should be considered as nondeterministic Make stateful udf as nondeterministic Add new test cases with both Stateful and Stateless UDF. Without the patch, the test cases will throw exception: 1 did not equal 10 ScalaTestFailureLocation: org.apache.spark.sql.hive.execution.HiveUDFSuite$$anonfun$21 at (HiveUDFSuite.scala:501) org.scalatest.exceptions.TestFailedException: 1 did not equal 10 at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500) at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555) ... Author: Zhan ZhangCloses #16068 from zhzhan/state. (cherry picked from commit 67587d961d5f94a8639c20cb80127c86bf79d5a8) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72bf5199 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72bf5199 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72bf5199 Branch: refs/heads/branch-2.1 Commit: 72bf5199738c7ab0361b2b55eb4f4299048a21fa Parents: 2c88e1d Author: Zhan Zhang Authored: Fri Dec 9 16:35:06 2016 +0800 Committer: Wenchen Fan Committed: Fri Dec 9 16:36:35 2016 +0800 -- .../org/apache/spark/sql/hive/hiveUDFs.scala| 4 +- .../spark/sql/hive/execution/HiveUDFSuite.scala | 45 +++- 2 files changed, 45 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72bf5199/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index e30e0f9..37414ad 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -59,7 +59,7 @@ private[hive] case class HiveSimpleUDF( @transient private lazy val isUDFDeterministic = { val udfType = function.getClass().getAnnotation(classOf[HiveUDFType]) -udfType != null && udfType.deterministic() +udfType != null && udfType.deterministic() && !udfType.stateful() } override def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) @@ -142,7 +142,7 @@ private[hive] case class HiveGenericUDF( @transient private lazy val isUDFDeterministic = { val udfType = function.getClass.getAnnotation(classOf[HiveUDFType]) -udfType != null && udfType.deterministic() +udfType != null && udfType.deterministic() && !udfType.stateful() } @transient http://git-wip-us.apache.org/repos/asf/spark/blob/72bf5199/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 48adc83..4098bb5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -21,15 +21,17 @@ import java.io.{DataInput, DataOutput, File, PrintWriter} import java.util.{ArrayList, Arrays, Properties} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.ql.udf.UDAFPercentile +import org.apache.hadoop.hive.ql.exec.UDF +import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType} import org.apache.hadoop.hive.ql.udf.generic._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory -import org.apache.hadoop.io.Writable +import org.apache.hadoop.io.{LongWritable, Writable} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.functions.max import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.util.Utils @@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS
spark git commit: [SPARK-18637][SQL] Stateful UDF should be considered as nondeterministic
Repository: spark Updated Branches: refs/heads/master c074c96dc -> 67587d961 [SPARK-18637][SQL] Stateful UDF should be considered as nondeterministic ## What changes were proposed in this pull request? Make stateful udf as nondeterministic ## How was this patch tested? Add new test cases with both Stateful and Stateless UDF. Without the patch, the test cases will throw exception: 1 did not equal 10 ScalaTestFailureLocation: org.apache.spark.sql.hive.execution.HiveUDFSuite$$anonfun$21 at (HiveUDFSuite.scala:501) org.scalatest.exceptions.TestFailedException: 1 did not equal 10 at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500) at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555) ... Author: Zhan ZhangCloses #16068 from zhzhan/state. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67587d96 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67587d96 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67587d96 Branch: refs/heads/master Commit: 67587d961d5f94a8639c20cb80127c86bf79d5a8 Parents: c074c96 Author: Zhan Zhang Authored: Fri Dec 9 16:35:06 2016 +0800 Committer: Wenchen Fan Committed: Fri Dec 9 16:35:06 2016 +0800 -- .../org/apache/spark/sql/hive/hiveUDFs.scala| 4 +- .../spark/sql/hive/execution/HiveUDFSuite.scala | 45 +++- 2 files changed, 45 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/67587d96/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 349faae..26dc372 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -61,7 +61,7 @@ private[hive] case class HiveSimpleUDF( @transient private lazy val isUDFDeterministic = { val udfType = function.getClass.getAnnotation(classOf[HiveUDFType]) -udfType != null && udfType.deterministic() +udfType != null && udfType.deterministic() && !udfType.stateful() } override def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) @@ -144,7 +144,7 @@ private[hive] case class HiveGenericUDF( @transient private lazy val isUDFDeterministic = { val udfType = function.getClass.getAnnotation(classOf[HiveUDFType]) -udfType != null && udfType.deterministic() +udfType != null && udfType.deterministic() && !udfType.stateful() } @transient http://git-wip-us.apache.org/repos/asf/spark/blob/67587d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 48adc83..4098bb5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -21,15 +21,17 @@ import java.io.{DataInput, DataOutput, File, PrintWriter} import java.util.{ArrayList, Arrays, Properties} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.ql.udf.UDAFPercentile +import org.apache.hadoop.hive.ql.exec.UDF +import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType} import org.apache.hadoop.hive.ql.udf.generic._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory -import org.apache.hadoop.io.Writable +import org.apache.hadoop.io.{LongWritable, Writable} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.functions.max import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.util.Utils @@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY