spark git commit: [SPARK-12198][SPARKR] SparkR support read.parquet and deprecate parquetFile
Repository: spark Updated Branches: refs/heads/master db5165246 -> eeb58722a [SPARK-12198][SPARKR] SparkR support read.parquet and deprecate parquetFile SparkR support ```read.parquet``` and deprecate ```parquetFile```. This change is similar with #10145 for ```jsonFile```. Author: Yanbo LiangCloses #10191 from yanboliang/spark-12198. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eeb58722 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eeb58722 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eeb58722 Branch: refs/heads/master Commit: eeb58722ad73441eeb5f35f864be3c5392cfd426 Parents: db51652 Author: Yanbo Liang Authored: Thu Dec 10 09:44:53 2015 -0800 Committer: Shivaram Venkataraman Committed: Thu Dec 10 09:44:53 2015 -0800 -- R/pkg/NAMESPACE | 1 + R/pkg/R/SQLContext.R | 16 ++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 +++ 3 files changed, 22 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eeb58722/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 565a2b1..ba64bc5 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -270,6 +270,7 @@ export("as.DataFrame", "loadDF", "parquetFile", "read.df", + "read.parquet", "sql", "table", "tableNames", http://git-wip-us.apache.org/repos/asf/spark/blob/eeb58722/R/pkg/R/SQLContext.R -- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 85541c8..f678c70 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -256,18 +256,30 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { } } - #' Create a DataFrame from a Parquet file. #' #' Loads a Parquet file, returning the result as a DataFrame. #' #' @param sqlContext SQLContext to use -#' @param ... Path(s) of parquet file(s) to read. +#' @param path Path of file to read. A vector of multiple paths is allowed. #' @return DataFrame +#' @rdname read.parquet +#' @name read.parquet #' @export +read.parquet <- function(sqlContext, path) { + # Allow the user to have a more flexible definiton of the text file path + paths <- as.list(suppressWarnings(normalizePath(path))) + read <- callJMethod(sqlContext, "read") + sdf <- callJMethod(read, "parquet", paths) + dataFrame(sdf) +} +#' @rdname read.parquet +#' @name parquetFile +#' @export # TODO: Implement saveasParquetFile and write examples for both parquetFile <- function(sqlContext, ...) { + .Deprecated("read.parquet") # Allow the user to have a more flexible definiton of the text file path paths <- lapply(list(...), function(x) suppressWarnings(normalizePath(x))) sdf <- callJMethod(sqlContext, "parquetFile", paths) http://git-wip-us.apache.org/repos/asf/spark/blob/eeb58722/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 39fc94a..222c04a 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1420,22 +1420,25 @@ test_that("mutate(), transform(), rename() and names()", { detach(airquality) }) -test_that("write.df() on DataFrame and works with parquetFile", { +test_that("write.df() on DataFrame and works with read.parquet", { df <- jsonFile(sqlContext, jsonPath) write.df(df, parquetPath, "parquet", mode="overwrite") - parquetDF <- parquetFile(sqlContext, parquetPath) + parquetDF <- read.parquet(sqlContext, parquetPath) expect_is(parquetDF, "DataFrame") expect_equal(count(df), count(parquetDF)) }) -test_that("parquetFile works with multiple input paths", { +test_that("read.parquet()/parquetFile() works with multiple input paths", { df <- jsonFile(sqlContext, jsonPath) write.df(df, parquetPath, "parquet", mode="overwrite") parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet") write.df(df, parquetPath2, "parquet", mode="overwrite") - parquetDF <- parquetFile(sqlContext, parquetPath, parquetPath2) + parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2)) expect_is(parquetDF, "DataFrame") expect_equal(count(parquetDF), count(df) * 2) + parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, parquetPath2)) + expect_is(parquetDF2, "DataFrame") + expect_equal(count(parquetDF2), count(df) * 2) # Test if varargs works with variables saveMode <- "overwrite"
spark git commit: [SPARK-11602][MLLIB] Refine visibility for 1.6 scala API audit
Repository: spark Updated Branches: refs/heads/master eeb58722a -> 9fba9c800 [SPARK-11602][MLLIB] Refine visibility for 1.6 scala API audit jira: https://issues.apache.org/jira/browse/SPARK-11602 Made a pass on the API change of 1.6. Open the PR for efficient discussion. Author: Yuhao YangCloses #9939 from hhbyyh/auditScala. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fba9c80 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fba9c80 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fba9c80 Branch: refs/heads/master Commit: 9fba9c8004d2b97549e5456fa7918965bec27336 Parents: eeb5872 Author: Yuhao Yang Authored: Thu Dec 10 10:15:50 2015 -0800 Committer: Joseph K. Bradley Committed: Thu Dec 10 10:15:50 2015 -0800 -- .../src/main/scala/org/apache/spark/ml/feature/package-info.java | 2 +- .../scala/org/apache/spark/ml/regression/LinearRegression.scala | 2 +- .../org/apache/spark/mllib/clustering/BisectingKMeans.scala | 2 +- .../org/apache/spark/mllib/clustering/BisectingKMeansModel.scala | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9fba9c80/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java b/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java index c22d2e0..7a35f2d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java @@ -23,7 +23,7 @@ * features into more suitable forms for model fitting. * Most feature transformers are implemented as {@link org.apache.spark.ml.Transformer}s, which * transforms one {@link org.apache.spark.sql.DataFrame} into another, e.g., - * {@link org.apache.spark.feature.HashingTF}. + * {@link org.apache.spark.ml.feature.HashingTF}. * Some feature transformers are implemented as {@link org.apache.spark.ml.Estimator}}s, because the * transformation requires some aggregated information of the dataset, e.g., document * frequencies in {@link org.apache.spark.ml.feature.IDF}. http://git-wip-us.apache.org/repos/asf/spark/blob/9fba9c80/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 1db9166..5e58509 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -529,7 +529,7 @@ class LinearRegressionSummary private[regression] ( val predictionCol: String, val labelCol: String, val model: LinearRegressionModel, -val diagInvAtWA: Array[Double]) extends Serializable { +private val diagInvAtWA: Array[Double]) extends Serializable { @transient private val metrics = new RegressionMetrics( predictions http://git-wip-us.apache.org/repos/asf/spark/blob/9fba9c80/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index 82adfa6..54bf510 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -407,7 +407,7 @@ private object BisectingKMeans extends Serializable { */ @Since("1.6.0") @Experimental -class ClusteringTreeNode private[clustering] ( +private[clustering] class ClusteringTreeNode private[clustering] ( val index: Int, val size: Long, private val centerWithNorm: VectorWithNorm, http://git-wip-us.apache.org/repos/asf/spark/blob/9fba9c80/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala index f942e56..9ccf96b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala @@ -32,8 +32,8 @@ import org.apache.spark.rdd.RDD */
spark git commit: [SPARK-12250][SQL] Allow users to define a UDAF without providing details of its inputSchema
Repository: spark Updated Branches: refs/heads/master d9d354ed4 -> bc5f56aa6 [SPARK-12250][SQL] Allow users to define a UDAF without providing details of its inputSchema https://issues.apache.org/jira/browse/SPARK-12250 Author: Yin HuaiCloses #10236 from yhuai/SPARK-12250. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc5f56aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc5f56aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc5f56aa Branch: refs/heads/master Commit: bc5f56aa60a430244ffa0cacd81c0b1ecbf8d68f Parents: d9d354e Author: Yin Huai Authored: Thu Dec 10 12:03:29 2015 -0800 Committer: Yin Huai Committed: Thu Dec 10 12:03:29 2015 -0800 -- .../spark/sql/execution/aggregate/udaf.scala| 5 -- .../hive/execution/AggregationQuerySuite.scala | 64 2 files changed, 64 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc5f56aa/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala index 20359c1..c0d0010 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala @@ -332,11 +332,6 @@ private[sql] case class ScalaUDAF( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = copy(inputAggBufferOffset = newInputAggBufferOffset) - require( -children.length == udaf.inputSchema.length, -s"$udaf only accepts ${udaf.inputSchema.length} arguments, " + - s"but ${children.length} are provided.") - override def nullable: Boolean = true override def dataType: DataType = udaf.dataType http://git-wip-us.apache.org/repos/asf/spark/blob/bc5f56aa/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 39c0a2a..064c000 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -66,6 +66,33 @@ class ScalaAggregateFunction(schema: StructType) extends UserDefinedAggregateFun } } +class ScalaAggregateFunctionWithoutInputSchema extends UserDefinedAggregateFunction { + + def inputSchema: StructType = StructType(Nil) + + def bufferSchema: StructType = StructType(StructField("value", LongType) :: Nil) + + def dataType: DataType = LongType + + def deterministic: Boolean = true + + def initialize(buffer: MutableAggregationBuffer): Unit = { +buffer.update(0, 0L) + } + + def update(buffer: MutableAggregationBuffer, input: Row): Unit = { +buffer.update(0, input.getAs[Seq[Row]](0).map(_.getAs[Int]("v")).sum + buffer.getLong(0)) + } + + def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { +buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0)) + } + + def evaluate(buffer: Row): Any = { +buffer.getLong(0) + } +} + class LongProductSum extends UserDefinedAggregateFunction { def inputSchema: StructType = new StructType() .add("a", LongType) @@ -858,6 +885,43 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te ) } } + + test("udaf without specifying inputSchema") { +withTempTable("noInputSchemaUDAF") { + sqlContext.udf.register("noInputSchema", new ScalaAggregateFunctionWithoutInputSchema) + + val data = +Row(1, Seq(Row(1), Row(2), Row(3))) :: + Row(1, Seq(Row(4), Row(5), Row(6))) :: + Row(2, Seq(Row(-10))) :: Nil + val schema = +StructType( + StructField("key", IntegerType) :: +StructField("myArray", + ArrayType(StructType(StructField("v", IntegerType) :: Nil))) :: Nil) + sqlContext.createDataFrame( +sparkContext.parallelize(data, 2), +schema) +.registerTempTable("noInputSchemaUDAF") + + checkAnswer( +sqlContext.sql( + """ +|SELECT key, noInputSchema(myArray) +|FROM noInputSchemaUDAF +|GROUP BY key + """.stripMargin), +Row(1, 21) :: Row(2, -10) :: Nil) + + checkAnswer( +sqlContext.sql( +
spark git commit: [SPARK-12250][SQL] Allow users to define a UDAF without providing details of its inputSchema
Repository: spark Updated Branches: refs/heads/branch-1.6 e541f703d -> 594fafc61 [SPARK-12250][SQL] Allow users to define a UDAF without providing details of its inputSchema https://issues.apache.org/jira/browse/SPARK-12250 Author: Yin HuaiCloses #10236 from yhuai/SPARK-12250. (cherry picked from commit bc5f56aa60a430244ffa0cacd81c0b1ecbf8d68f) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/594fafc6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/594fafc6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/594fafc6 Branch: refs/heads/branch-1.6 Commit: 594fafc6122ad9c6b24bdb4a434d97158c7745f3 Parents: e541f70 Author: Yin Huai Authored: Thu Dec 10 12:03:29 2015 -0800 Committer: Yin Huai Committed: Thu Dec 10 12:03:40 2015 -0800 -- .../spark/sql/execution/aggregate/udaf.scala| 5 -- .../hive/execution/AggregationQuerySuite.scala | 64 2 files changed, 64 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/594fafc6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala index 20359c1..c0d0010 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala @@ -332,11 +332,6 @@ private[sql] case class ScalaUDAF( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = copy(inputAggBufferOffset = newInputAggBufferOffset) - require( -children.length == udaf.inputSchema.length, -s"$udaf only accepts ${udaf.inputSchema.length} arguments, " + - s"but ${children.length} are provided.") - override def nullable: Boolean = true override def dataType: DataType = udaf.dataType http://git-wip-us.apache.org/repos/asf/spark/blob/594fafc6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 39c0a2a..064c000 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -66,6 +66,33 @@ class ScalaAggregateFunction(schema: StructType) extends UserDefinedAggregateFun } } +class ScalaAggregateFunctionWithoutInputSchema extends UserDefinedAggregateFunction { + + def inputSchema: StructType = StructType(Nil) + + def bufferSchema: StructType = StructType(StructField("value", LongType) :: Nil) + + def dataType: DataType = LongType + + def deterministic: Boolean = true + + def initialize(buffer: MutableAggregationBuffer): Unit = { +buffer.update(0, 0L) + } + + def update(buffer: MutableAggregationBuffer, input: Row): Unit = { +buffer.update(0, input.getAs[Seq[Row]](0).map(_.getAs[Int]("v")).sum + buffer.getLong(0)) + } + + def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { +buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0)) + } + + def evaluate(buffer: Row): Any = { +buffer.getLong(0) + } +} + class LongProductSum extends UserDefinedAggregateFunction { def inputSchema: StructType = new StructType() .add("a", LongType) @@ -858,6 +885,43 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te ) } } + + test("udaf without specifying inputSchema") { +withTempTable("noInputSchemaUDAF") { + sqlContext.udf.register("noInputSchema", new ScalaAggregateFunctionWithoutInputSchema) + + val data = +Row(1, Seq(Row(1), Row(2), Row(3))) :: + Row(1, Seq(Row(4), Row(5), Row(6))) :: + Row(2, Seq(Row(-10))) :: Nil + val schema = +StructType( + StructField("key", IntegerType) :: +StructField("myArray", + ArrayType(StructType(StructField("v", IntegerType) :: Nil))) :: Nil) + sqlContext.createDataFrame( +sparkContext.parallelize(data, 2), +schema) +.registerTempTable("noInputSchemaUDAF") + + checkAnswer( +sqlContext.sql( + """ +|SELECT key, noInputSchema(myArray) +|FROM noInputSchemaUDAF +|GROUP BY
spark git commit: [SPARK-12228][SQL] Try to run execution hive's derby in memory.
Repository: spark Updated Branches: refs/heads/master bc5f56aa6 -> ec5f9ed5d [SPARK-12228][SQL] Try to run execution hive's derby in memory. This PR tries to make execution hive's derby run in memory since it is a fake metastore and every time we create a HiveContext, we will switch to a new one. It is possible that it can reduce the flakyness of our tests that need to create HiveContext (e.g. HiveSparkSubmitSuite). I will test it more. https://issues.apache.org/jira/browse/SPARK-12228 Author: Yin HuaiCloses #10204 from yhuai/derbyInMemory. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec5f9ed5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec5f9ed5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec5f9ed5 Branch: refs/heads/master Commit: ec5f9ed5de2218938dba52152475daafd4dc4786 Parents: bc5f56a Author: Yin Huai Authored: Thu Dec 10 12:04:20 2015 -0800 Committer: Yin Huai Committed: Thu Dec 10 12:04:20 2015 -0800 -- .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 2 +- .../main/scala/org/apache/spark/sql/hive/HiveContext.scala | 8 +--- .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 +- .../org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala | 2 ++ 4 files changed, 9 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ec5f9ed5/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 4b928e6..03bb2c2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -83,7 +83,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { val cliConf = new HiveConf(classOf[SessionState]) // Override the location of the metastore since this is only used for local execution. -HiveContext.newTemporaryConfiguration().foreach { +HiveContext.newTemporaryConfiguration(useInMemoryDerby = false).foreach { case (key, value) => cliConf.set(key, value) } val sessionState = new CliSessionState(cliConf) http://git-wip-us.apache.org/repos/asf/spark/blob/ec5f9ed5/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index e83941c..5958777 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -212,7 +212,7 @@ class HiveContext private[hive]( val loader = new IsolatedClientLoader( version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), execJars = Seq(), - config = newTemporaryConfiguration(), + config = newTemporaryConfiguration(useInMemoryDerby = true), isolationOn = false, baseClassLoader = Utils.getContextOrSparkClassLoader) loader.createClient().asInstanceOf[ClientWrapper] @@ -721,7 +721,9 @@ private[hive] object HiveContext { doc = "TODO") /** Constructs a configuration for hive, where the metastore is located in a temp directory. */ - def newTemporaryConfiguration(): Map[String, String] = { + def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String] = { +val withInMemoryMode = if (useInMemoryDerby) "memory:" else "" + val tempDir = Utils.createTempDir() val localMetastore = new File(tempDir, "metastore") val propMap: HashMap[String, String] = HashMap() @@ -735,7 +737,7 @@ private[hive] object HiveContext { } propMap.put(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, localMetastore.toURI.toString) propMap.put(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, - s"jdbc:derby:;databaseName=${localMetastore.getAbsolutePath};create=true") + s"jdbc:derby:${withInMemoryMode};databaseName=${localMetastore.getAbsolutePath};create=true") propMap.put("datanucleus.rdbms.datastoreAdapterClassName", "org.datanucleus.store.rdbms.adapter.DerbyAdapter") http://git-wip-us.apache.org/repos/asf/spark/blob/ec5f9ed5/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
[1/2] spark git commit: [SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib and mllib in the documentation.
Repository: spark Updated Branches: refs/heads/master ec5f9ed5d -> 2ecbe02d5 http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/mllib-clustering.md -- diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 8fbced6..48d64cd 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -1,7 +1,7 @@ --- layout: global -title: Clustering - MLlib -displayTitle: MLlib - Clustering +title: Clustering - spark.mllib +displayTitle: Clustering - spark.mllib --- [Clustering](https://en.wikipedia.org/wiki/Cluster_analysis) is an unsupervised learning problem whereby we aim to group subsets @@ -10,19 +10,19 @@ often used for exploratory analysis and/or as a component of a hierarchical [supervised learning](https://en.wikipedia.org/wiki/Supervised_learning) pipeline (in which distinct classifiers or regression models are trained for each cluster). -MLlib supports the following models: +The `spark.mllib` package supports the following models: * Table of contents {:toc} ## K-means -[k-means](http://en.wikipedia.org/wiki/K-means_clustering) is one of the +[K-means](http://en.wikipedia.org/wiki/K-means_clustering) is one of the most commonly used clustering algorithms that clusters the data points into a -predefined number of clusters. The MLlib implementation includes a parallelized +predefined number of clusters. The `spark.mllib` implementation includes a parallelized variant of the [k-means++](http://en.wikipedia.org/wiki/K-means%2B%2B) method called [kmeans||](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf). -The implementation in MLlib has the following parameters: +The implementation in `spark.mllib` has the following parameters: * *k* is the number of desired clusters. * *maxIterations* is the maximum number of iterations to run. @@ -171,7 +171,7 @@ sameModel = KMeansModel.load(sc, "myModelPath") A [Gaussian Mixture Model](http://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model) represents a composite distribution whereby points are drawn from one of *k* Gaussian sub-distributions, -each with its own probability. The MLlib implementation uses the +each with its own probability. The `spark.mllib` implementation uses the [expectation-maximization](http://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm) algorithm to induce the maximum-likelihood model given a set of samples. The implementation has the following parameters: @@ -308,13 +308,13 @@ graph given pairwise similarties as edge properties, described in [Lin and Cohen, Power Iteration Clustering](http://www.icml2010.org/papers/387.pdf). It computes a pseudo-eigenvector of the normalized affinity matrix of the graph via [power iteration](http://en.wikipedia.org/wiki/Power_iteration) and uses it to cluster vertices. -MLlib includes an implementation of PIC using GraphX as its backend. +`spark.mllib` includes an implementation of PIC using GraphX as its backend. It takes an `RDD` of `(srcId, dstId, similarity)` tuples and outputs a model with the clustering assignments. The similarities must be nonnegative. PIC assumes that the similarity measure is symmetric. A pair `(srcId, dstId)` regardless of the ordering should appear at most once in the input data. If a pair is missing from input, their similarity is treated as zero. -MLlib's PIC implementation takes the following (hyper-)parameters: +`spark.mllib`'s PIC implementation takes the following (hyper-)parameters: * `k`: number of clusters * `maxIterations`: maximum number of power iterations @@ -323,7 +323,7 @@ MLlib's PIC implementation takes the following (hyper-)parameters: **Examples** -In the following, we show code snippets to demonstrate how to use PIC in MLlib. +In the following, we show code snippets to demonstrate how to use PIC in `spark.mllib`. @@ -493,7 +493,7 @@ checkpointing can help reduce shuffle file sizes on disk and help with failure recovery. -All of MLlib's LDA models support: +All of `spark.mllib`'s LDA models support: * `describeTopics`: Returns topics as arrays of most important terms and term weights @@ -721,7 +721,7 @@ sameModel = LDAModel.load(sc, "myModelPath") ## Streaming k-means When data arrive in a stream, we may want to estimate clusters dynamically, -updating them as new data arrive. MLlib provides support for streaming k-means clustering, +updating them as new data arrive. `spark.mllib` provides support for streaming k-means clustering, with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm uses a generalization of the mini-batch k-means update rule. For each batch of data, we assign all points to their nearest cluster, compute new cluster centers, then update each cluster using: http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/mllib-collaborative-filtering.md
[2/2] spark git commit: [SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib and mllib in the documentation.
[SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib and mllib in the documentation. Replaces a number of occurences of `MLlib` in the documentation that were meant to refer to the `spark.mllib` package instead. It should clarify for new users the difference between `spark.mllib` (the package) and MLlib (the umbrella project for ML in spark). It also removes some files that I forgot to delete with #10207 Author: Timothy HunterCloses #10234 from thunterdb/12212. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ecbe02d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ecbe02d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ecbe02d Branch: refs/heads/master Commit: 2ecbe02d5b28ee562d10c1735244b90a08532c9e Parents: ec5f9ed Author: Timothy Hunter Authored: Thu Dec 10 12:50:46 2015 -0800 Committer: Joseph K. Bradley Committed: Thu Dec 10 12:50:46 2015 -0800 -- docs/_data/menu-ml.yaml | 2 +- docs/_includes/nav-left-wrapper-ml.html | 4 +- docs/ml-advanced.md | 2 +- docs/ml-ann.md | 8 + docs/ml-classification-regression.md| 25 +- docs/ml-clustering.md | 4 +- docs/ml-decision-tree.md| 171 + docs/ml-ensembles.md| 319 + docs/ml-features.md | 4 +- docs/ml-guide.md| 19 +- docs/ml-intro.md| 941 --- docs/ml-linear-methods.md | 148 + docs/ml-survival-regression.md | 96 +-- docs/mllib-classification-regression.md | 6 +- docs/mllib-clustering.md| 24 +- docs/mllib-collaborative-filtering.md | 14 +- docs/mllib-data-types.md| 2 +- docs/mllib-decision-tree.md | 6 +- docs/mllib-dimensionality-reduction.md | 10 +- docs/mllib-ensembles.md | 16 +- docs/mllib-evaluation-metrics.md| 10 +- docs/mllib-feature-extraction.md| 12 +- docs/mllib-frequent-pattern-mining.md | 12 +- docs/mllib-guide.md | 2 +- docs/mllib-isotonic-regression.md | 6 +- docs/mllib-linear-methods.md| 31 +- docs/mllib-migration-guides.md | 4 +- docs/mllib-naive-bayes.md | 6 +- docs/mllib-optimization.md | 8 +- docs/mllib-pmml-model-export.md | 12 +- docs/mllib-statistics.md| 18 +- 31 files changed, 149 insertions(+), 1793 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/_data/menu-ml.yaml -- diff --git a/docs/_data/menu-ml.yaml b/docs/_data/menu-ml.yaml index fe37d05..2eea9a9 100644 --- a/docs/_data/menu-ml.yaml +++ b/docs/_data/menu-ml.yaml @@ -1,5 +1,5 @@ - text: "Overview: estimators, transformers and pipelines" - url: ml-intro.html + url: ml-guide.html - text: Extracting, transforming and selecting features url: ml-features.html - text: Classification and Regression http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/_includes/nav-left-wrapper-ml.html -- diff --git a/docs/_includes/nav-left-wrapper-ml.html b/docs/_includes/nav-left-wrapper-ml.html index 0103e89..e2d7eda 100644 --- a/docs/_includes/nav-left-wrapper-ml.html +++ b/docs/_includes/nav-left-wrapper-ml.html @@ -1,8 +1,8 @@ -spark.ml package +spark.ml package {% include nav-left.html nav=include.nav-ml %} -spark.mllib package +spark.mllib package {% include nav-left.html nav=include.nav-mllib %} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/ml-advanced.md -- diff --git a/docs/ml-advanced.md b/docs/ml-advanced.md index b005633..91731d7 100644 --- a/docs/ml-advanced.md +++ b/docs/ml-advanced.md @@ -1,7 +1,7 @@ --- layout: global title: Advanced topics - spark.ml -displayTitle: Advanced topics +displayTitle: Advanced topics - spark.ml --- # Optimization of linear methods http://git-wip-us.apache.org/repos/asf/spark/blob/2ecbe02d/docs/ml-ann.md -- diff --git a/docs/ml-ann.md b/docs/ml-ann.md new file mode 100644 index 000..c2d9bd2 --- /dev/null +++ b/docs/ml-ann.md @@ -0,0 +1,8 @@ +--- +layout: global +title: Multilayer perceptron classifier - spark.ml +displayTitle: Multilayer perceptron classifier - spark.ml +--- + + > This section has
[2/2] spark git commit: [SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib and mllib in the documentation.
[SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib and mllib in the documentation. Replaces a number of occurences of `MLlib` in the documentation that were meant to refer to the `spark.mllib` package instead. It should clarify for new users the difference between `spark.mllib` (the package) and MLlib (the umbrella project for ML in spark). It also removes some files that I forgot to delete with #10207 Author: Timothy HunterCloses #10234 from thunterdb/12212. (cherry picked from commit 2ecbe02d5b28ee562d10c1735244b90a08532c9e) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0307dea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0307dea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0307dea Branch: refs/heads/branch-1.6 Commit: d0307deaa29d5fcf1c675f9367c26aa6a3db3fba Parents: 594fafc Author: Timothy Hunter Authored: Thu Dec 10 12:50:46 2015 -0800 Committer: Joseph K. Bradley Committed: Thu Dec 10 12:51:00 2015 -0800 -- docs/_data/menu-ml.yaml | 2 +- docs/_includes/nav-left-wrapper-ml.html | 4 +- docs/ml-advanced.md | 2 +- docs/ml-ann.md | 8 + docs/ml-classification-regression.md| 25 +- docs/ml-clustering.md | 4 +- docs/ml-decision-tree.md| 171 + docs/ml-ensembles.md| 319 + docs/ml-features.md | 4 +- docs/ml-guide.md| 19 +- docs/ml-intro.md| 941 --- docs/ml-linear-methods.md | 148 + docs/ml-survival-regression.md | 96 +-- docs/mllib-classification-regression.md | 6 +- docs/mllib-clustering.md| 24 +- docs/mllib-collaborative-filtering.md | 14 +- docs/mllib-data-types.md| 2 +- docs/mllib-decision-tree.md | 6 +- docs/mllib-dimensionality-reduction.md | 10 +- docs/mllib-ensembles.md | 16 +- docs/mllib-evaluation-metrics.md| 10 +- docs/mllib-feature-extraction.md| 12 +- docs/mllib-frequent-pattern-mining.md | 12 +- docs/mllib-guide.md | 2 +- docs/mllib-isotonic-regression.md | 6 +- docs/mllib-linear-methods.md| 31 +- docs/mllib-migration-guides.md | 4 +- docs/mllib-naive-bayes.md | 6 +- docs/mllib-optimization.md | 8 +- docs/mllib-pmml-model-export.md | 12 +- docs/mllib-statistics.md| 18 +- 31 files changed, 149 insertions(+), 1793 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/_data/menu-ml.yaml -- diff --git a/docs/_data/menu-ml.yaml b/docs/_data/menu-ml.yaml index fe37d05..2eea9a9 100644 --- a/docs/_data/menu-ml.yaml +++ b/docs/_data/menu-ml.yaml @@ -1,5 +1,5 @@ - text: "Overview: estimators, transformers and pipelines" - url: ml-intro.html + url: ml-guide.html - text: Extracting, transforming and selecting features url: ml-features.html - text: Classification and Regression http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/_includes/nav-left-wrapper-ml.html -- diff --git a/docs/_includes/nav-left-wrapper-ml.html b/docs/_includes/nav-left-wrapper-ml.html index 0103e89..e2d7eda 100644 --- a/docs/_includes/nav-left-wrapper-ml.html +++ b/docs/_includes/nav-left-wrapper-ml.html @@ -1,8 +1,8 @@ -spark.ml package +spark.ml package {% include nav-left.html nav=include.nav-ml %} -spark.mllib package +spark.mllib package {% include nav-left.html nav=include.nav-mllib %} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/ml-advanced.md -- diff --git a/docs/ml-advanced.md b/docs/ml-advanced.md index b005633..91731d7 100644 --- a/docs/ml-advanced.md +++ b/docs/ml-advanced.md @@ -1,7 +1,7 @@ --- layout: global title: Advanced topics - spark.ml -displayTitle: Advanced topics +displayTitle: Advanced topics - spark.ml --- # Optimization of linear methods http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/ml-ann.md -- diff --git a/docs/ml-ann.md b/docs/ml-ann.md new file mode 100644 index 000..c2d9bd2 --- /dev/null +++ b/docs/ml-ann.md @@ -0,0 +1,8 @@ +--- +layout: global +title:
[1/2] spark git commit: [SPARK-12212][ML][DOC] Clarifies the difference between spark.ml, spark.mllib and mllib in the documentation.
Repository: spark Updated Branches: refs/heads/branch-1.6 594fafc61 -> d0307deaa http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/mllib-clustering.md -- diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 8fbced6..48d64cd 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -1,7 +1,7 @@ --- layout: global -title: Clustering - MLlib -displayTitle: MLlib - Clustering +title: Clustering - spark.mllib +displayTitle: Clustering - spark.mllib --- [Clustering](https://en.wikipedia.org/wiki/Cluster_analysis) is an unsupervised learning problem whereby we aim to group subsets @@ -10,19 +10,19 @@ often used for exploratory analysis and/or as a component of a hierarchical [supervised learning](https://en.wikipedia.org/wiki/Supervised_learning) pipeline (in which distinct classifiers or regression models are trained for each cluster). -MLlib supports the following models: +The `spark.mllib` package supports the following models: * Table of contents {:toc} ## K-means -[k-means](http://en.wikipedia.org/wiki/K-means_clustering) is one of the +[K-means](http://en.wikipedia.org/wiki/K-means_clustering) is one of the most commonly used clustering algorithms that clusters the data points into a -predefined number of clusters. The MLlib implementation includes a parallelized +predefined number of clusters. The `spark.mllib` implementation includes a parallelized variant of the [k-means++](http://en.wikipedia.org/wiki/K-means%2B%2B) method called [kmeans||](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf). -The implementation in MLlib has the following parameters: +The implementation in `spark.mllib` has the following parameters: * *k* is the number of desired clusters. * *maxIterations* is the maximum number of iterations to run. @@ -171,7 +171,7 @@ sameModel = KMeansModel.load(sc, "myModelPath") A [Gaussian Mixture Model](http://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model) represents a composite distribution whereby points are drawn from one of *k* Gaussian sub-distributions, -each with its own probability. The MLlib implementation uses the +each with its own probability. The `spark.mllib` implementation uses the [expectation-maximization](http://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm) algorithm to induce the maximum-likelihood model given a set of samples. The implementation has the following parameters: @@ -308,13 +308,13 @@ graph given pairwise similarties as edge properties, described in [Lin and Cohen, Power Iteration Clustering](http://www.icml2010.org/papers/387.pdf). It computes a pseudo-eigenvector of the normalized affinity matrix of the graph via [power iteration](http://en.wikipedia.org/wiki/Power_iteration) and uses it to cluster vertices. -MLlib includes an implementation of PIC using GraphX as its backend. +`spark.mllib` includes an implementation of PIC using GraphX as its backend. It takes an `RDD` of `(srcId, dstId, similarity)` tuples and outputs a model with the clustering assignments. The similarities must be nonnegative. PIC assumes that the similarity measure is symmetric. A pair `(srcId, dstId)` regardless of the ordering should appear at most once in the input data. If a pair is missing from input, their similarity is treated as zero. -MLlib's PIC implementation takes the following (hyper-)parameters: +`spark.mllib`'s PIC implementation takes the following (hyper-)parameters: * `k`: number of clusters * `maxIterations`: maximum number of power iterations @@ -323,7 +323,7 @@ MLlib's PIC implementation takes the following (hyper-)parameters: **Examples** -In the following, we show code snippets to demonstrate how to use PIC in MLlib. +In the following, we show code snippets to demonstrate how to use PIC in `spark.mllib`. @@ -493,7 +493,7 @@ checkpointing can help reduce shuffle file sizes on disk and help with failure recovery. -All of MLlib's LDA models support: +All of `spark.mllib`'s LDA models support: * `describeTopics`: Returns topics as arrays of most important terms and term weights @@ -721,7 +721,7 @@ sameModel = LDAModel.load(sc, "myModelPath") ## Streaming k-means When data arrive in a stream, we may want to estimate clusters dynamically, -updating them as new data arrive. MLlib provides support for streaming k-means clustering, +updating them as new data arrive. `spark.mllib` provides support for streaming k-means clustering, with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm uses a generalization of the mini-batch k-means update rule. For each batch of data, we assign all points to their nearest cluster, compute new cluster centers, then update each cluster using: http://git-wip-us.apache.org/repos/asf/spark/blob/d0307dea/docs/mllib-collaborative-filtering.md
spark git commit: [SPARK-11563][CORE][REPL] Use RpcEnv to transfer REPL-generated classes.
Repository: spark Updated Branches: refs/heads/master 2ecbe02d5 -> 4a46b8859 [SPARK-11563][CORE][REPL] Use RpcEnv to transfer REPL-generated classes. This avoids bringing up yet another HTTP server on the driver, and instead reuses the file server already managed by the driver's RpcEnv. As a bonus, the repl now inherits the security features of the network library. There's also a small change to create the directory for storing classes under the root temp dir for the application (instead of directly under java.io.tmpdir). Author: Marcelo VanzinCloses #9923 from vanzin/SPARK-11563. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a46b885 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a46b885 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a46b885 Branch: refs/heads/master Commit: 4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c Parents: 2ecbe02 Author: Marcelo Vanzin Authored: Thu Dec 10 13:26:30 2015 -0800 Committer: Marcelo Vanzin Committed: Thu Dec 10 13:26:30 2015 -0800 -- .../scala/org/apache/spark/HttpFileServer.scala | 5 ++ .../scala/org/apache/spark/HttpServer.scala | 26 +++ .../scala/org/apache/spark/SparkContext.scala | 6 +++ .../org/apache/spark/executor/Executor.scala| 6 +-- .../scala/org/apache/spark/rpc/RpcEnv.scala | 18 .../org/apache/spark/rpc/akka/AkkaRpcEnv.scala | 5 ++ .../spark/rpc/netty/NettyStreamManager.scala| 22 - .../org/apache/spark/rpc/RpcEnvSuite.scala | 25 +- docs/configuration.md | 8 docs/security.md| 8 .../org/apache/spark/repl/SparkILoop.scala | 17 --- .../org/apache/spark/repl/SparkIMain.scala | 28 ++-- .../main/scala/org/apache/spark/repl/Main.scala | 23 +- .../apache/spark/repl/ExecutorClassLoader.scala | 36 --- .../spark/repl/ExecutorClassLoaderSuite.scala | 48 15 files changed, 183 insertions(+), 98 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4a46b885/core/src/main/scala/org/apache/spark/HttpFileServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index 7cf7bc0..77d8ec9 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -71,6 +71,11 @@ private[spark] class HttpFileServer( serverUri + "/jars/" + file.getName } + def addDirectory(path: String, resourceBase: String): String = { +httpServer.addDirectory(path, resourceBase) +serverUri + path + } + def addFileToDir(file: File, dir: File) : String = { // Check whether the file is a directory. If it is, throw a more meaningful exception. // If we don't catch this, Guava throws a very confusing error message: http://git-wip-us.apache.org/repos/asf/spark/blob/4a46b885/core/src/main/scala/org/apache/spark/HttpServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 8de3a6c..faa3ef3 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -23,10 +23,9 @@ import org.eclipse.jetty.server.ssl.SslSocketConnector import org.eclipse.jetty.util.security.{Constraint, Password} import org.eclipse.jetty.security.authentication.DigestAuthenticator import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService} - import org.eclipse.jetty.server.Server import org.eclipse.jetty.server.bio.SocketConnector -import org.eclipse.jetty.server.handler.{DefaultHandler, HandlerList, ResourceHandler} +import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder} import org.eclipse.jetty.util.thread.QueuedThreadPool import org.apache.spark.util.Utils @@ -52,6 +51,11 @@ private[spark] class HttpServer( private var server: Server = null private var port: Int = requestedPort + private val servlets = { +val handler = new ServletContextHandler() +handler.setContextPath("/") +handler + } def start() { if (server != null) { @@ -65,6 +69,14 @@ private[spark] class HttpServer( } } + def addDirectory(contextPath: String, resourceBase: String): Unit = { +val holder = new ServletHolder() +holder.setInitParameter("resourceBase", resourceBase) +holder.setInitParameter("pathInfoOnly", "true") +
spark git commit: [SPARK-11713] [PYSPARK] [STREAMING] Initial RDD updateStateByKey for PySpark
Repository: spark Updated Branches: refs/heads/master 4a46b8859 -> 6a6c1fc5c [SPARK-11713] [PYSPARK] [STREAMING] Initial RDD updateStateByKey for PySpark Adding ability to define an initial state RDD for use with updateStateByKey PySpark. Added unit test and changed stateful_network_wordcount example to use initial RDD. Author: Bryan CutlerCloses #10082 from BryanCutler/initial-rdd-updateStateByKey-SPARK-11713. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a6c1fc5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a6c1fc5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a6c1fc5 Branch: refs/heads/master Commit: 6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a Parents: 4a46b88 Author: Bryan Cutler Authored: Thu Dec 10 14:21:15 2015 -0800 Committer: Davies Liu Committed: Thu Dec 10 14:21:15 2015 -0800 -- .../streaming/stateful_network_wordcount.py | 5 - python/pyspark/streaming/dstream.py | 13 +++-- python/pyspark/streaming/tests.py | 20 .../streaming/api/python/PythonDStream.scala| 14 -- 4 files changed, 47 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a6c1fc5/examples/src/main/python/streaming/stateful_network_wordcount.py -- diff --git a/examples/src/main/python/streaming/stateful_network_wordcount.py b/examples/src/main/python/streaming/stateful_network_wordcount.py index 16ef646..f8bbc65 100644 --- a/examples/src/main/python/streaming/stateful_network_wordcount.py +++ b/examples/src/main/python/streaming/stateful_network_wordcount.py @@ -44,13 +44,16 @@ if __name__ == "__main__": ssc = StreamingContext(sc, 1) ssc.checkpoint("checkpoint") +# RDD with initial state (key, value) pairs +initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)]) + def updateFunc(new_values, last_sum): return sum(new_values) + (last_sum or 0) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) running_counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda word: (word, 1))\ - .updateStateByKey(updateFunc) + .updateStateByKey(updateFunc, initialRDD=initialStateRDD) running_counts.pprint() http://git-wip-us.apache.org/repos/asf/spark/blob/6a6c1fc5/python/pyspark/streaming/dstream.py -- diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index acec850..f61137c 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -568,7 +568,7 @@ class DStream(object): self._ssc._jduration(slideDuration)) return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) -def updateStateByKey(self, updateFunc, numPartitions=None): +def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None): """ Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of the key. @@ -579,6 +579,9 @@ class DStream(object): if numPartitions is None: numPartitions = self._sc.defaultParallelism +if initialRDD and not isinstance(initialRDD, RDD): +initialRDD = self._sc.parallelize(initialRDD) + def reduceFunc(t, a, b): if a is None: g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None)) @@ -590,7 +593,13 @@ class DStream(object): jreduceFunc = TransformFunction(self._sc, reduceFunc, self._sc.serializer, self._jrdd_deserializer) -dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc) +if initialRDD: +initialRDD = initialRDD._reserialize(self._jrdd_deserializer) +dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc, + initialRDD._jrdd) +else: +dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc) + return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) http://git-wip-us.apache.org/repos/asf/spark/blob/6a6c1fc5/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py
spark git commit: [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc
Repository: spark Updated Branches: refs/heads/branch-1.4 c7c99857d -> 43f02e41e [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc With the merge of [SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python API has the same functionalities compared to Scala/Java, so here changing the description to make it more precise. zsxwing tdas , please review, thanks a lot. Author: jerryshaoCloses #10246 from jerryshao/direct-kafka-doc-update. (cherry picked from commit 24d3357d66e14388faf8709b368edca70ea96432) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43f02e41 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43f02e41 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43f02e41 Branch: refs/heads/branch-1.4 Commit: 43f02e41e36256c2b90ee06d8b380ff2dfc7cabf Parents: c7c9985 Author: jerryshao Authored: Thu Dec 10 15:31:46 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 10 15:32:16 2015 -0800 -- docs/streaming-kafka-integration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/43f02e41/docs/streaming-kafka-integration.md -- diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md index 79b811c..100f637 100644 --- a/docs/streaming-kafka-integration.md +++ b/docs/streaming-kafka-integration.md @@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming application. [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. ## Approach 2: Direct Approach (No Receivers) -This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it is not yet at full feature parity. +This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API. This approach has the following advantages over the receiver-based approach (i.e. Approach 1). - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12258][SQL] passing null into ScalaUDF
Repository: spark Updated Branches: refs/heads/branch-1.6 5d3722f8e -> d09af2cb4 [SPARK-12258][SQL] passing null into ScalaUDF Check nullability and passing them into ScalaUDF. Closes #10249 Author: Davies LiuCloses #10259 from davies/udf_null. (cherry picked from commit b1b4ee7f3541d92c8bc2b0b4fdadf46cfdb09504) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d09af2cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d09af2cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d09af2cb Branch: refs/heads/branch-1.6 Commit: d09af2cb4237cca9ac72aacb9abb822a2982a820 Parents: 5d3722f Author: Davies Liu Authored: Thu Dec 10 17:22:18 2015 -0800 Committer: Yin Huai Committed: Thu Dec 10 17:22:57 2015 -0800 -- .../apache/spark/sql/catalyst/expressions/ScalaUDF.scala| 7 +-- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala| 9 + 2 files changed, 10 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d09af2cb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index 03b8922..5deb2f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -1029,8 +1029,11 @@ case class ScalaUDF( // such as IntegerType, its javaType is `int` and the returned type of user-defined // function is Object. Trying to convert an Object to `int` will cause casting exception. val evalCode = evals.map(_.code).mkString -val funcArguments = converterTerms.zip(evals).map { - case (converter, eval) => s"$converter.apply(${eval.value})" +val funcArguments = converterTerms.zipWithIndex.map { + case (converter, i) => +val eval = evals(i) +val dt = children(i).dataType +s"$converter.apply(${eval.isNull} ? null : (${ctx.boxedType(dt)}) ${eval.value})" }.mkString(",") val callFunc = s"${ctx.boxedType(ctx.javaType(dataType))} $resultTerm = " + s"(${ctx.boxedType(ctx.javaType(dataType))})${catalystConverterTerm}" + http://git-wip-us.apache.org/repos/asf/spark/blob/d09af2cb/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 76e9648..ce4fb23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1131,14 +1131,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-11725: correctly handle null inputs for ScalaUDF") { -val df = Seq( +val df = sparkContext.parallelize(Seq( new java.lang.Integer(22) -> "John", - null.asInstanceOf[java.lang.Integer] -> "Lucy").toDF("age", "name") + null.asInstanceOf[java.lang.Integer] -> "Lucy")).toDF("age", "name") +// passing null into the UDF that could handle it val boxedUDF = udf[java.lang.Integer, java.lang.Integer] { - (i: java.lang.Integer) => if (i == null) null else i * 2 + (i: java.lang.Integer) => if (i == null) -10 else i * 2 } -checkAnswer(df.select(boxedUDF($"age")), Row(44) :: Row(null) :: Nil) +checkAnswer(df.select(boxedUDF($"age")), Row(44) :: Row(-10) :: Nil) val primitiveUDF = udf((i: Int) => i * 2) checkAnswer(df.select(primitiveUDF($"age")), Row(44) :: Row(null) :: Nil) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management
Repository: spark Updated Branches: refs/heads/branch-1.6 9870e5c7a -> c247b6a65 [SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management **Problem.** In unified memory management, acquiring execution memory may lead to eviction of storage memory. However, the space freed from evicting cached blocks is distributed among all active tasks. Thus, an incorrect upper bound on the execution memory per task can cause the acquisition to fail, leading to OOM's and premature spills. **Example.** Suppose total memory is 1000B, cached blocks occupy 900B, `spark.memory.storageFraction` is 0.4, and there are two active tasks. In this case, the cap on task execution memory is 100B / 2 = 50B. If task A tries to acquire 200B, it will evict 100B of storage but can only acquire 50B because of the incorrect cap. For another example, see this [regression test](https://github.com/andrewor14/spark/blob/fix-oom/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala#L233) that I stole from JoshRosen. **Solution.** Fix the cap on task execution memory. It should take into account the space that could have been freed by storage in addition to the current amount of memory available to execution. In the example above, the correct cap should have been 600B / 2 = 300B. This patch also guards against the race condition (SPARK-12253): (1) Existing tasks collectively occupy all execution memory (2) New task comes in and blocks while existing tasks spill (3) After tasks finish spilling, another task jumps in and puts in a large block, stealing the freed memory (4) New task still cannot acquire memory and goes back to sleep Author: Andrew OrCloses #10240 from andrewor14/fix-oom. (cherry picked from commit 5030923ea8bb94ac8fa8e432de9fc7089aa93986) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c247b6a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c247b6a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c247b6a6 Branch: refs/heads/branch-1.6 Commit: c247b6a6546e12e3c6992c40cad1881d56aefd6f Parents: 9870e5c Author: Andrew Or Authored: Thu Dec 10 15:30:08 2015 -0800 Committer: Andrew Or Committed: Thu Dec 10 15:30:14 2015 -0800 -- .../spark/memory/ExecutionMemoryPool.scala | 57 ++-- .../spark/memory/UnifiedMemoryManager.scala | 57 +++- .../scala/org/apache/spark/scheduler/Task.scala | 6 +++ .../memory/UnifiedMemoryManagerSuite.scala | 25 + 4 files changed, 114 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c247b6a6/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala -- diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala index 9023e1a..dbb0ad8 100644 --- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala @@ -70,11 +70,28 @@ private[memory] class ExecutionMemoryPool( * active tasks) before it is forced to spill. This can happen if the number of tasks increase * but an older task had a lot of memory already. * + * @param numBytes number of bytes to acquire + * @param taskAttemptId the task attempt acquiring memory + * @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in + * one parameter (Long) that represents the desired amount of memory by + * which this pool should be expanded. + * @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool + * at this given moment. This is not a field because the max pool + * size is variable in certain cases. For instance, in unified + * memory management, the execution pool can be expanded by evicting + * cached blocks, thereby shrinking the storage pool. + * * @return the number of bytes granted to the task. */ - def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = lock.synchronized { + private[memory] def acquireMemory( + numBytes: Long, + taskAttemptId: Long, + maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit, + computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized { assert(numBytes > 0, s"invalid number of bytes requested: $numBytes") +// TODO: clean up this clunky method
spark git commit: [SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management
Repository: spark Updated Branches: refs/heads/master 23a9e62ba -> 5030923ea [SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management **Problem.** In unified memory management, acquiring execution memory may lead to eviction of storage memory. However, the space freed from evicting cached blocks is distributed among all active tasks. Thus, an incorrect upper bound on the execution memory per task can cause the acquisition to fail, leading to OOM's and premature spills. **Example.** Suppose total memory is 1000B, cached blocks occupy 900B, `spark.memory.storageFraction` is 0.4, and there are two active tasks. In this case, the cap on task execution memory is 100B / 2 = 50B. If task A tries to acquire 200B, it will evict 100B of storage but can only acquire 50B because of the incorrect cap. For another example, see this [regression test](https://github.com/andrewor14/spark/blob/fix-oom/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala#L233) that I stole from JoshRosen. **Solution.** Fix the cap on task execution memory. It should take into account the space that could have been freed by storage in addition to the current amount of memory available to execution. In the example above, the correct cap should have been 600B / 2 = 300B. This patch also guards against the race condition (SPARK-12253): (1) Existing tasks collectively occupy all execution memory (2) New task comes in and blocks while existing tasks spill (3) After tasks finish spilling, another task jumps in and puts in a large block, stealing the freed memory (4) New task still cannot acquire memory and goes back to sleep Author: Andrew OrCloses #10240 from andrewor14/fix-oom. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5030923e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5030923e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5030923e Branch: refs/heads/master Commit: 5030923ea8bb94ac8fa8e432de9fc7089aa93986 Parents: 23a9e62 Author: Andrew Or Authored: Thu Dec 10 15:30:08 2015 -0800 Committer: Andrew Or Committed: Thu Dec 10 15:30:08 2015 -0800 -- .../spark/memory/ExecutionMemoryPool.scala | 57 ++-- .../spark/memory/UnifiedMemoryManager.scala | 57 +++- .../scala/org/apache/spark/scheduler/Task.scala | 6 +++ .../memory/UnifiedMemoryManagerSuite.scala | 25 + 4 files changed, 114 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5030923e/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala -- diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala index 9023e1a..dbb0ad8 100644 --- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala @@ -70,11 +70,28 @@ private[memory] class ExecutionMemoryPool( * active tasks) before it is forced to spill. This can happen if the number of tasks increase * but an older task had a lot of memory already. * + * @param numBytes number of bytes to acquire + * @param taskAttemptId the task attempt acquiring memory + * @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in + * one parameter (Long) that represents the desired amount of memory by + * which this pool should be expanded. + * @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool + * at this given moment. This is not a field because the max pool + * size is variable in certain cases. For instance, in unified + * memory management, the execution pool can be expanded by evicting + * cached blocks, thereby shrinking the storage pool. + * * @return the number of bytes granted to the task. */ - def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = lock.synchronized { + private[memory] def acquireMemory( + numBytes: Long, + taskAttemptId: Long, + maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit, + computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized { assert(numBytes > 0, s"invalid number of bytes requested: $numBytes") +// TODO: clean up this clunky method signature + // Add this task to the taskMemory map just so we can keep an accurate count of the number // of active
spark git commit: [SPARK-12251] Document and improve off-heap memory configurations
Repository: spark Updated Branches: refs/heads/branch-1.6 d0307deaa -> 9870e5c7a [SPARK-12251] Document and improve off-heap memory configurations This patch adds documentation for Spark configurations that affect off-heap memory and makes some naming and validation improvements for those configs. - Change `spark.memory.offHeapSize` to `spark.memory.offHeap.size`. This is fine because this configuration has not shipped in any Spark release yet (it's new in Spark 1.6). - Deprecated `spark.unsafe.offHeap` in favor of a new `spark.memory.offHeap.enabled` configuration. The motivation behind this change is to gather all memory-related configurations under the same prefix. - Add a check which prevents users from setting `spark.memory.offHeap.enabled=true` when `spark.memory.offHeap.size == 0`. After SPARK-11389 (#9344), which was committed in Spark 1.6, Spark enforces a hard limit on the amount of off-heap memory that it will allocate to tasks. As a result, enabling off-heap execution memory without setting `spark.memory.offHeap.size` will lead to immediate OOMs. The new configuration validation makes this scenario easier to diagnose, helping to avoid user confusion. - Document these configurations on the configuration page. Author: Josh RosenCloses #10237 from JoshRosen/SPARK-12251. (cherry picked from commit 23a9e62bad9669e9ff5dc4bd714f58d12f9be0b5) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9870e5c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9870e5c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9870e5c7 Branch: refs/heads/branch-1.6 Commit: 9870e5c7af87190167ca3845ede918671b9420ca Parents: d0307de Author: Josh Rosen Authored: Thu Dec 10 15:29:04 2015 -0800 Committer: Andrew Or Committed: Thu Dec 10 15:29:13 2015 -0800 -- .../main/scala/org/apache/spark/SparkConf.scala | 4 +++- .../org/apache/spark/memory/MemoryManager.scala | 10 -- .../spark/memory/TaskMemoryManagerSuite.java| 21 .../shuffle/sort/PackedRecordPointerSuite.java | 6 -- .../sort/ShuffleInMemorySorterSuite.java| 4 ++-- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 2 +- .../map/AbstractBytesToBytesMapSuite.java | 4 ++-- .../unsafe/sort/UnsafeExternalSorterSuite.java | 2 +- .../unsafe/sort/UnsafeInMemorySorterSuite.java | 4 ++-- .../spark/memory/StaticMemoryManagerSuite.scala | 2 +- .../memory/UnifiedMemoryManagerSuite.scala | 2 +- docs/configuration.md | 16 +++ .../sql/execution/joins/HashedRelation.scala| 6 +- .../UnsafeFixedWidthAggregationMapSuite.scala | 2 +- .../execution/UnsafeKVExternalSorterSuite.scala | 2 +- 15 files changed, 65 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9870e5c7/core/src/main/scala/org/apache/spark/SparkConf.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 19633a3..d3384fb 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging { "spark.streaming.fileStream.minRememberDuration" -> Seq( AlternateConfig("spark.streaming.minRememberDuration", "1.5")), "spark.yarn.max.executor.failures" -> Seq( - AlternateConfig("spark.yarn.max.worker.failures", "1.5")) + AlternateConfig("spark.yarn.max.worker.failures", "1.5")), +"spark.memory.offHeap.enabled" -> Seq( + AlternateConfig("spark.unsafe.offHeap", "1.6")) ) /** http://git-wip-us.apache.org/repos/asf/spark/blob/9870e5c7/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index ae9e1ac..e707e27 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -50,7 +50,7 @@ private[spark] abstract class MemoryManager( storageMemoryPool.incrementPoolSize(storageMemory) onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory) - offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeapSize", 0)) + offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0)) /** * Total available memory
spark git commit: [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc
Repository: spark Updated Branches: refs/heads/branch-1.5 4b99f72f7 -> cb0246c93 [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc With the merge of [SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python API has the same functionalities compared to Scala/Java, so here changing the description to make it more precise. zsxwing tdas , please review, thanks a lot. Author: jerryshaoCloses #10246 from jerryshao/direct-kafka-doc-update. (cherry picked from commit 24d3357d66e14388faf8709b368edca70ea96432) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb0246c9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb0246c9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb0246c9 Branch: refs/heads/branch-1.5 Commit: cb0246c9314892dbb3403488154b2d987c90b1dd Parents: 4b99f72 Author: jerryshao Authored: Thu Dec 10 15:31:46 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 10 15:32:05 2015 -0800 -- docs/streaming-kafka-integration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb0246c9/docs/streaming-kafka-integration.md -- diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md index ab7f011..d58f4f6 100644 --- a/docs/streaming-kafka-integration.md +++ b/docs/streaming-kafka-integration.md @@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming application. [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. ## Approach 2: Direct Approach (No Receivers) -This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it is not yet at full feature parity. +This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API. This approach has the following advantages over the receiver-based approach (i.e. Approach 1). - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc
Repository: spark Updated Branches: refs/heads/master 5030923ea -> 24d3357d6 [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc With the merge of [SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python API has the same functionalities compared to Scala/Java, so here changing the description to make it more precise. zsxwing tdas , please review, thanks a lot. Author: jerryshaoCloses #10246 from jerryshao/direct-kafka-doc-update. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24d3357d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24d3357d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24d3357d Branch: refs/heads/master Commit: 24d3357d66e14388faf8709b368edca70ea96432 Parents: 5030923 Author: jerryshao Authored: Thu Dec 10 15:31:46 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 10 15:31:46 2015 -0800 -- docs/streaming-kafka-integration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/24d3357d/docs/streaming-kafka-integration.md -- diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md index b00351b..5be73c4 100644 --- a/docs/streaming-kafka-integration.md +++ b/docs/streaming-kafka-integration.md @@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming application. [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. ## Approach 2: Direct Approach (No Receivers) -This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it is not yet at full feature parity. +This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API. This approach has the following advantages over the receiver-based approach (i.e. Approach 1). - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11602][MLLIB] Refine visibility for 1.6 scala API audit
Repository: spark Updated Branches: refs/heads/branch-1.6 b7b9f7727 -> e65c88536 [SPARK-11602][MLLIB] Refine visibility for 1.6 scala API audit jira: https://issues.apache.org/jira/browse/SPARK-11602 Made a pass on the API change of 1.6. Open the PR for efficient discussion. Author: Yuhao YangCloses #9939 from hhbyyh/auditScala. (cherry picked from commit 9fba9c8004d2b97549e5456fa7918965bec27336) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e65c8853 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e65c8853 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e65c8853 Branch: refs/heads/branch-1.6 Commit: e65c88536ad1843a45e0fe3cf1edadfdf4ad3460 Parents: b7b9f77 Author: Yuhao Yang Authored: Thu Dec 10 10:15:50 2015 -0800 Committer: Joseph K. Bradley Committed: Thu Dec 10 10:16:04 2015 -0800 -- .../src/main/scala/org/apache/spark/ml/feature/package-info.java | 2 +- .../scala/org/apache/spark/ml/regression/LinearRegression.scala | 2 +- .../org/apache/spark/mllib/clustering/BisectingKMeans.scala | 2 +- .../org/apache/spark/mllib/clustering/BisectingKMeansModel.scala | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e65c8853/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java b/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java index c22d2e0..7a35f2d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/package-info.java @@ -23,7 +23,7 @@ * features into more suitable forms for model fitting. * Most feature transformers are implemented as {@link org.apache.spark.ml.Transformer}s, which * transforms one {@link org.apache.spark.sql.DataFrame} into another, e.g., - * {@link org.apache.spark.feature.HashingTF}. + * {@link org.apache.spark.ml.feature.HashingTF}. * Some feature transformers are implemented as {@link org.apache.spark.ml.Estimator}}s, because the * transformation requires some aggregated information of the dataset, e.g., document * frequencies in {@link org.apache.spark.ml.feature.IDF}. http://git-wip-us.apache.org/repos/asf/spark/blob/e65c8853/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 1db9166..5e58509 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -529,7 +529,7 @@ class LinearRegressionSummary private[regression] ( val predictionCol: String, val labelCol: String, val model: LinearRegressionModel, -val diagInvAtWA: Array[Double]) extends Serializable { +private val diagInvAtWA: Array[Double]) extends Serializable { @transient private val metrics = new RegressionMetrics( predictions http://git-wip-us.apache.org/repos/asf/spark/blob/e65c8853/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index 82adfa6..54bf510 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -407,7 +407,7 @@ private object BisectingKMeans extends Serializable { */ @Since("1.6.0") @Experimental -class ClusteringTreeNode private[clustering] ( +private[clustering] class ClusteringTreeNode private[clustering] ( val index: Int, val size: Long, private val centerWithNorm: VectorWithNorm, http://git-wip-us.apache.org/repos/asf/spark/blob/e65c8853/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala index f942e56..9ccf96b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala +++
spark git commit: [SPARK-12234][SPARKR] Fix ```subset``` function error when only set ```select``` argument
Repository: spark Updated Branches: refs/heads/branch-1.6 e65c88536 -> 93ef24638 [SPARK-12234][SPARKR] Fix ```subset``` function error when only set ```select``` argument Fix ```subset``` function error when only set ```select``` argument. Please refer to the [JIRA](https://issues.apache.org/jira/browse/SPARK-12234) about the error and how to reproduce it. cc sun-rui felixcheung shivaram Author: Yanbo LiangCloses #10217 from yanboliang/spark-12234. (cherry picked from commit d9d354ed40eec56b3f03d32f4e2629d367b1bf02) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93ef2463 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93ef2463 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93ef2463 Branch: refs/heads/branch-1.6 Commit: 93ef2463820928f434f9fb1542bc30cfb1cec9aa Parents: e65c885 Author: Yanbo Liang Authored: Thu Dec 10 10:18:58 2015 -0800 Committer: Shivaram Venkataraman Committed: Thu Dec 10 10:19:06 2015 -0800 -- R/pkg/R/DataFrame.R | 9 +++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/93ef2463/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 81b4e6b..f4c4a25 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1185,7 +1185,7 @@ setMethod("[", signature(x = "DataFrame", i = "Column"), #' #' Return subsets of DataFrame according to given conditions #' @param x A DataFrame -#' @param subset A logical expression to filter on rows +#' @param subset (Optional) A logical expression to filter on rows #' @param select expression for the single Column or a list of columns to select from the DataFrame #' @return A new DataFrame containing only the rows that meet the condition with selected columns #' @export @@ -1206,10 +1206,15 @@ setMethod("[", signature(x = "DataFrame", i = "Column"), #' df[df$age %in% c(19, 30), 1:2] #' subset(df, df$age %in% c(19, 30), 1:2) #' subset(df, df$age %in% c(19), select = c(1,2)) +#' subset(df, select = c(1,2)) #' } setMethod("subset", signature(x = "DataFrame"), function(x, subset, select, ...) { -x[subset, select, ...] +if (missing(subset)) { + x[, select, ...] +} else { + x[subset, select, ...] +} }) #' Select http://git-wip-us.apache.org/repos/asf/spark/blob/93ef2463/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 222c04a..2051784 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -799,6 +799,10 @@ test_that("subsetting", { expect_equal(count(df6), 1) expect_equal(columns(df6), c("name", "age")) + df7 <- subset(df, select = "name") + expect_equal(count(df7), 3) + expect_equal(columns(df7), c("name")) + # Test base::subset is working expect_equal(nrow(subset(airquality, Temp > 80, select = c(Ozone, Temp))), 68) }) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12198][SPARKR] SparkR support read.parquet and deprecate parquetFile
Repository: spark Updated Branches: refs/heads/branch-1.6 f939c71b1 -> b7b9f7727 [SPARK-12198][SPARKR] SparkR support read.parquet and deprecate parquetFile SparkR support ```read.parquet``` and deprecate ```parquetFile```. This change is similar with #10145 for ```jsonFile```. Author: Yanbo LiangCloses #10191 from yanboliang/spark-12198. (cherry picked from commit eeb58722ad73441eeb5f35f864be3c5392cfd426) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7b9f772 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7b9f772 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7b9f772 Branch: refs/heads/branch-1.6 Commit: b7b9f772751dc4ea7eb28a2bdb897a04e563fafa Parents: f939c71 Author: Yanbo Liang Authored: Thu Dec 10 09:44:53 2015 -0800 Committer: Shivaram Venkataraman Committed: Thu Dec 10 09:45:01 2015 -0800 -- R/pkg/NAMESPACE | 1 + R/pkg/R/SQLContext.R | 16 ++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 +++ 3 files changed, 22 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b7b9f772/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 565a2b1..ba64bc5 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -270,6 +270,7 @@ export("as.DataFrame", "loadDF", "parquetFile", "read.df", + "read.parquet", "sql", "table", "tableNames", http://git-wip-us.apache.org/repos/asf/spark/blob/b7b9f772/R/pkg/R/SQLContext.R -- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 85541c8..f678c70 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -256,18 +256,30 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { } } - #' Create a DataFrame from a Parquet file. #' #' Loads a Parquet file, returning the result as a DataFrame. #' #' @param sqlContext SQLContext to use -#' @param ... Path(s) of parquet file(s) to read. +#' @param path Path of file to read. A vector of multiple paths is allowed. #' @return DataFrame +#' @rdname read.parquet +#' @name read.parquet #' @export +read.parquet <- function(sqlContext, path) { + # Allow the user to have a more flexible definiton of the text file path + paths <- as.list(suppressWarnings(normalizePath(path))) + read <- callJMethod(sqlContext, "read") + sdf <- callJMethod(read, "parquet", paths) + dataFrame(sdf) +} +#' @rdname read.parquet +#' @name parquetFile +#' @export # TODO: Implement saveasParquetFile and write examples for both parquetFile <- function(sqlContext, ...) { + .Deprecated("read.parquet") # Allow the user to have a more flexible definiton of the text file path paths <- lapply(list(...), function(x) suppressWarnings(normalizePath(x))) sdf <- callJMethod(sqlContext, "parquetFile", paths) http://git-wip-us.apache.org/repos/asf/spark/blob/b7b9f772/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 39fc94a..222c04a 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1420,22 +1420,25 @@ test_that("mutate(), transform(), rename() and names()", { detach(airquality) }) -test_that("write.df() on DataFrame and works with parquetFile", { +test_that("write.df() on DataFrame and works with read.parquet", { df <- jsonFile(sqlContext, jsonPath) write.df(df, parquetPath, "parquet", mode="overwrite") - parquetDF <- parquetFile(sqlContext, parquetPath) + parquetDF <- read.parquet(sqlContext, parquetPath) expect_is(parquetDF, "DataFrame") expect_equal(count(df), count(parquetDF)) }) -test_that("parquetFile works with multiple input paths", { +test_that("read.parquet()/parquetFile() works with multiple input paths", { df <- jsonFile(sqlContext, jsonPath) write.df(df, parquetPath, "parquet", mode="overwrite") parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet") write.df(df, parquetPath2, "parquet", mode="overwrite") - parquetDF <- parquetFile(sqlContext, parquetPath, parquetPath2) + parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2)) expect_is(parquetDF, "DataFrame") expect_equal(count(parquetDF), count(df) * 2) + parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, parquetPath2)) + expect_is(parquetDF2, "DataFrame") +
spark git commit: [SPARK-12136][STREAMING] rddToFileName does not properly handle prefix and suffix parameters
Repository: spark Updated Branches: refs/heads/branch-1.6 f6d866173 -> b5e5812f9 [SPARK-12136][STREAMING] rddToFileName does not properly handle prefix and suffix parameters The original code does not properly handle the cases where the prefix is null, but suffix is not null - the suffix should be used but is not. The fix is using StringBuilder to construct the proper file name. Author: bomengAuthor: Bo Meng Closes #10185 from bomeng/SPARK-12136. (cherry picked from commit e29704f90dfe67d9e276d242699ac0a00f64fb91) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b5e5812f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b5e5812f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b5e5812f Branch: refs/heads/branch-1.6 Commit: b5e5812f9ef8aa8d133a75bb8aa8dd8680130efa Parents: f6d8661 Author: bomeng Authored: Thu Dec 10 12:53:53 2015 + Committer: Sean Owen Committed: Thu Dec 10 12:54:08 2015 + -- .../org/apache/spark/streaming/StreamingContext.scala | 13 +++-- 1 file changed, 7 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b5e5812f/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 6fb8ad3..53324e7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -887,12 +887,13 @@ object StreamingContext extends Logging { } private[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { -if (prefix == null) { - time.milliseconds.toString -} else if (suffix == null || suffix.length ==0) { - prefix + "-" + time.milliseconds -} else { - prefix + "-" + time.milliseconds + "." + suffix +var result = time.milliseconds.toString +if (prefix != null && prefix.length > 0) { + result = s"$prefix-$result" +} +if (suffix != null && suffix.length > 0) { + result = s"$result.$suffix" } +result } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12136][STREAMING] rddToFileName does not properly handle prefix and suffix parameters
Repository: spark Updated Branches: refs/heads/master d8ec081c9 -> e29704f90 [SPARK-12136][STREAMING] rddToFileName does not properly handle prefix and suffix parameters The original code does not properly handle the cases where the prefix is null, but suffix is not null - the suffix should be used but is not. The fix is using StringBuilder to construct the proper file name. Author: bomengAuthor: Bo Meng Closes #10185 from bomeng/SPARK-12136. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e29704f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e29704f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e29704f9 Branch: refs/heads/master Commit: e29704f90dfe67d9e276d242699ac0a00f64fb91 Parents: d8ec081 Author: bomeng Authored: Thu Dec 10 12:53:53 2015 + Committer: Sean Owen Committed: Thu Dec 10 12:53:53 2015 + -- .../org/apache/spark/streaming/StreamingContext.scala | 13 +++-- 1 file changed, 7 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e29704f9/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index cf843e3..b24c0d0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -892,12 +892,13 @@ object StreamingContext extends Logging { } private[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { -if (prefix == null) { - time.milliseconds.toString -} else if (suffix == null || suffix.length ==0) { - prefix + "-" + time.milliseconds -} else { - prefix + "-" + time.milliseconds + "." + suffix +var result = time.milliseconds.toString +if (prefix != null && prefix.length > 0) { + result = s"$prefix-$result" +} +if (suffix != null && suffix.length > 0) { + result = s"$result.$suffix" } +result } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11530][MLLIB] Return eigenvalues with PCA model
Repository: spark Updated Branches: refs/heads/master e29704f90 -> 21b3d2a75 [SPARK-11530][MLLIB] Return eigenvalues with PCA model Add `computePrincipalComponentsAndVariance` to also compute PCA's explained variance. CC mengxr Author: Sean OwenCloses #9736 from srowen/SPARK-11530. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21b3d2a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21b3d2a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21b3d2a7 Branch: refs/heads/master Commit: 21b3d2a75f679b252e293000d706741dca33624a Parents: e29704f Author: Sean Owen Authored: Thu Dec 10 14:05:45 2015 + Committer: Sean Owen Committed: Thu Dec 10 14:05:45 2015 + -- .../scala/org/apache/spark/ml/feature/PCA.scala | 20 ++-- .../org/apache/spark/mllib/feature/PCA.scala| 16 +++--- .../mllib/linalg/distributed/RowMatrix.scala| 33 +++- .../org/apache/spark/ml/feature/PCASuite.scala | 7 +++-- .../apache/spark/mllib/feature/PCASuite.scala | 3 +- .../linalg/distributed/RowMatrixSuite.scala | 10 +- project/MimaExcludes.scala | 3 ++ 7 files changed, 67 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/21b3d2a7/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index aa88cb0..53d33ea 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -73,7 +73,7 @@ class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v} val pca = new feature.PCA(k = $(k)) val pcaModel = pca.fit(input) -copyValues(new PCAModel(uid, pcaModel.pc).setParent(this)) +copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this)) } override def transformSchema(schema: StructType): StructType = { @@ -105,7 +105,8 @@ object PCA extends DefaultParamsReadable[PCA] { @Experimental class PCAModel private[ml] ( override val uid: String, -val pc: DenseMatrix) +val pc: DenseMatrix, +val explainedVariance: DenseVector) extends Model[PCAModel] with PCAParams with MLWritable { import PCAModel._ @@ -123,7 +124,7 @@ class PCAModel private[ml] ( */ override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema, logging = true) -val pcaModel = new feature.PCAModel($(k), pc) +val pcaModel = new feature.PCAModel($(k), pc, explainedVariance) val pcaOp = udf { pcaModel.transform _ } dataset.withColumn($(outputCol), pcaOp(col($(inputCol } @@ -139,7 +140,7 @@ class PCAModel private[ml] ( } override def copy(extra: ParamMap): PCAModel = { -val copied = new PCAModel(uid, pc) +val copied = new PCAModel(uid, pc, explainedVariance) copyValues(copied, extra).setParent(parent) } @@ -152,11 +153,11 @@ object PCAModel extends MLReadable[PCAModel] { private[PCAModel] class PCAModelWriter(instance: PCAModel) extends MLWriter { -private case class Data(pc: DenseMatrix) +private case class Data(pc: DenseMatrix, explainedVariance: DenseVector) override protected def saveImpl(path: String): Unit = { DefaultParamsWriter.saveMetadata(instance, path, sc) - val data = Data(instance.pc) + val data = Data(instance.pc, instance.explainedVariance) val dataPath = new Path(path, "data").toString sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } @@ -169,10 +170,11 @@ object PCAModel extends MLReadable[PCAModel] { override def load(path: String): PCAModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val Row(pc: DenseMatrix) = sqlContext.read.parquet(dataPath) -.select("pc") + val Row(pc: DenseMatrix, explainedVariance: DenseVector) = +sqlContext.read.parquet(dataPath) +.select("pc", "explainedVariance") .head() - val model = new PCAModel(metadata.uid, pc) + val model = new PCAModel(metadata.uid, pc, explainedVariance) DefaultParamsReader.getAndSetParams(model, metadata) model } http://git-wip-us.apache.org/repos/asf/spark/blob/21b3d2a7/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala -- diff --git
spark git commit: [SPARK-12234][SPARKR] Fix ```subset``` function error when only set ```select``` argument
Repository: spark Updated Branches: refs/heads/master 9fba9c800 -> d9d354ed4 [SPARK-12234][SPARKR] Fix ```subset``` function error when only set ```select``` argument Fix ```subset``` function error when only set ```select``` argument. Please refer to the [JIRA](https://issues.apache.org/jira/browse/SPARK-12234) about the error and how to reproduce it. cc sun-rui felixcheung shivaram Author: Yanbo LiangCloses #10217 from yanboliang/spark-12234. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9d354ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9d354ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9d354ed Branch: refs/heads/master Commit: d9d354ed40eec56b3f03d32f4e2629d367b1bf02 Parents: 9fba9c8 Author: Yanbo Liang Authored: Thu Dec 10 10:18:58 2015 -0800 Committer: Shivaram Venkataraman Committed: Thu Dec 10 10:18:58 2015 -0800 -- R/pkg/R/DataFrame.R | 9 +++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d9d354ed/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 81b4e6b..f4c4a25 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1185,7 +1185,7 @@ setMethod("[", signature(x = "DataFrame", i = "Column"), #' #' Return subsets of DataFrame according to given conditions #' @param x A DataFrame -#' @param subset A logical expression to filter on rows +#' @param subset (Optional) A logical expression to filter on rows #' @param select expression for the single Column or a list of columns to select from the DataFrame #' @return A new DataFrame containing only the rows that meet the condition with selected columns #' @export @@ -1206,10 +1206,15 @@ setMethod("[", signature(x = "DataFrame", i = "Column"), #' df[df$age %in% c(19, 30), 1:2] #' subset(df, df$age %in% c(19, 30), 1:2) #' subset(df, df$age %in% c(19), select = c(1,2)) +#' subset(df, select = c(1,2)) #' } setMethod("subset", signature(x = "DataFrame"), function(x, subset, select, ...) { -x[subset, select, ...] +if (missing(subset)) { + x[, select, ...] +} else { + x[subset, select, ...] +} }) #' Select http://git-wip-us.apache.org/repos/asf/spark/blob/d9d354ed/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 222c04a..2051784 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -799,6 +799,10 @@ test_that("subsetting", { expect_equal(count(df6), 1) expect_equal(columns(df6), c("name", "age")) + df7 <- subset(df, select = "name") + expect_equal(count(df7), 3) + expect_equal(columns(df7), c("name")) + # Test base::subset is working expect_equal(nrow(subset(airquality, Temp > 80, select = c(Ozone, Temp))), 68) }) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12012][SQL][BRANCH-1.6] Show more comprehensive PhysicalRDD metadata when visualizing SQL query plan
Repository: spark Updated Branches: refs/heads/branch-1.6 93ef24638 -> e541f703d [SPARK-12012][SQL][BRANCH-1.6] Show more comprehensive PhysicalRDD metadata when visualizing SQL query plan This PR backports PR #10004 to branch-1.6 It adds a private[sql] method metadata to SparkPlan, which can be used to describe detail information about a physical plan during visualization. Specifically, this PR uses this method to provide details of PhysicalRDDs translated from a data source relation. Author: Cheng LianCloses #10250 from liancheng/spark-12012.for-1.6. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e541f703 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e541f703 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e541f703 Branch: refs/heads/branch-1.6 Commit: e541f703d72d3dd3ad96db55650c5b1a1a5a38e2 Parents: 93ef246 Author: Cheng Lian Authored: Thu Dec 10 10:19:44 2015 -0800 Committer: Yin Huai Committed: Thu Dec 10 10:19:49 2015 -0800 -- python/pyspark/sql/dataframe.py | 2 +- .../spark/sql/execution/ExistingRDD.scala | 19 ++--- .../apache/spark/sql/execution/SparkPlan.scala | 5 +++ .../spark/sql/execution/SparkStrategies.scala | 2 +- .../datasources/DataSourceStrategy.scala| 22 -- .../datasources/parquet/ParquetRelation.scala | 10 + .../spark/sql/execution/ui/SparkPlanGraph.scala | 43 .../apache/spark/sql/sources/interfaces.scala | 2 +- .../spark/sql/execution/PlannerSuite.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 7 +++- 10 files changed, 83 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 746bb55..78ab475 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -213,7 +213,7 @@ class DataFrame(object): >>> df.explain() == Physical Plan == -Scan PhysicalRDD[age#0,name#1] +Scan ExistingRDD[age#0,name#1] >>> df.explain(True) == Parsed Logical Plan == http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 623348f..b8a4302 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -97,22 +97,31 @@ private[sql] case class LogicalRDD( private[sql] case class PhysicalRDD( output: Seq[Attribute], rdd: RDD[InternalRow], -extraInformation: String, +override val nodeName: String, +override val metadata: Map[String, String] = Map.empty, override val outputsUnsafeRows: Boolean = false) extends LeafNode { protected override def doExecute(): RDD[InternalRow] = rdd - override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]") + override def simpleString: String = { +val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" +s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" + } } private[sql] object PhysicalRDD { + // Metadata keys + val INPUT_PATHS = "InputPaths" + val PUSHED_FILTERS = "PushedFilters" + def createFromDataSource( output: Seq[Attribute], rdd: RDD[InternalRow], relation: BaseRelation, - extraInformation: String = ""): PhysicalRDD = { -PhysicalRDD(output, rdd, relation.toString + extraInformation, - relation.isInstanceOf[HadoopFsRelation]) + metadata: Map[String, String] = Map.empty): PhysicalRDD = { +// All HadoopFsRelations output UnsafeRows +val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation] +PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows) } } http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index a781777..ec98f81 100644 ---
spark git commit: [SPARK-12242][SQL] Add DataFrame.transform method
Repository: spark Updated Branches: refs/heads/branch-1.6 b5e5812f9 -> f939c71b1 [SPARK-12242][SQL] Add DataFrame.transform method Author: Reynold XinCloses #10226 from rxin/df-transform. (cherry picked from commit 76540b6df5370b463277d3498097b2cc2d2e97a8) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f939c71b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f939c71b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f939c71b Branch: refs/heads/branch-1.6 Commit: f939c71b187cff3a5bb63aa3659429b6efb0626d Parents: b5e5812 Author: Reynold Xin Authored: Thu Dec 10 22:23:10 2015 +0800 Committer: Reynold Xin Committed: Thu Dec 10 22:23:26 2015 +0800 -- .../src/main/scala/org/apache/spark/sql/Column.scala | 2 +- .../main/scala/org/apache/spark/sql/DataFrame.scala| 13 + 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f939c71b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index d641fca..297ef22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -84,7 +84,7 @@ class TypedColumn[-T, U]( * col("`a.column.with.dots`") // Escape `.` in column names. * $"columnName" // Scala short hand for a named column. * expr("a + 1") // A column that is constructed from a parsed SQL Expression. - * lit("1")// A column that produces a literal (constant) value. + * lit("abc") // A column that produces a literal (constant) value. * }}} * * [[Column]] objects can be composed to form complex expressions: http://git-wip-us.apache.org/repos/asf/spark/blob/f939c71b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index eb87003..1acfe84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1414,6 +1414,19 @@ class DataFrame private[sql]( def first(): Row = head() /** + * Concise syntax for chaining custom transformations. + * {{{ + * def featurize(ds: DataFrame) = ... + * + * df + * .transform(featurize) + * .transform(...) + * }}} + * @since 1.6.0 + */ + def transform[U](t: DataFrame => DataFrame): DataFrame = t(this) + + /** * Returns a new RDD by applying a function to all rows of this DataFrame. * @group rdd * @since 1.3.0 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12242][SQL] Add DataFrame.transform method
Repository: spark Updated Branches: refs/heads/master 21b3d2a75 -> 76540b6df [SPARK-12242][SQL] Add DataFrame.transform method Author: Reynold XinCloses #10226 from rxin/df-transform. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76540b6d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76540b6d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76540b6d Branch: refs/heads/master Commit: 76540b6df5370b463277d3498097b2cc2d2e97a8 Parents: 21b3d2a Author: Reynold Xin Authored: Thu Dec 10 22:23:10 2015 +0800 Committer: Reynold Xin Committed: Thu Dec 10 22:23:10 2015 +0800 -- .../src/main/scala/org/apache/spark/sql/Column.scala | 2 +- .../main/scala/org/apache/spark/sql/DataFrame.scala| 13 + 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/76540b6d/sql/core/src/main/scala/org/apache/spark/sql/Column.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index d641fca..297ef22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -84,7 +84,7 @@ class TypedColumn[-T, U]( * col("`a.column.with.dots`") // Escape `.` in column names. * $"columnName" // Scala short hand for a named column. * expr("a + 1") // A column that is constructed from a parsed SQL Expression. - * lit("1")// A column that produces a literal (constant) value. + * lit("abc") // A column that produces a literal (constant) value. * }}} * * [[Column]] objects can be composed to form complex expressions: http://git-wip-us.apache.org/repos/asf/spark/blob/76540b6d/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 243a8c8..da180a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1422,6 +1422,19 @@ class DataFrame private[sql]( def first(): Row = head() /** + * Concise syntax for chaining custom transformations. + * {{{ + * def featurize(ds: DataFrame) = ... + * + * df + * .transform(featurize) + * .transform(...) + * }}} + * @since 1.6.0 + */ + def transform[U](t: DataFrame => DataFrame): DataFrame = t(this) + + /** * Returns a new RDD by applying a function to all rows of this DataFrame. * @group rdd * @since 1.3.0 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11832][CORE] Process arguments in spark-shell for Scala 2.11
Repository: spark Updated Branches: refs/heads/master 76540b6df -> db5165246 [SPARK-11832][CORE] Process arguments in spark-shell for Scala 2.11 Process arguments passed to the spark-shell. Fixes running the spark-shell from within a build environment. Author: Jakob OderskyCloses #9824 from jodersky/shell-2.11. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/db516524 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/db516524 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/db516524 Branch: refs/heads/master Commit: db5165246f2888537dd0f3d4c5a515875c7358ed Parents: 76540b6 Author: Jakob Odersky Authored: Thu Dec 10 08:35:52 2015 -0800 Committer: Marcelo Vanzin Committed: Thu Dec 10 08:35:52 2015 -0800 -- .../main/scala/org/apache/spark/repl/Main.scala | 37 ++-- .../scala/org/apache/spark/repl/ReplSuite.scala | 3 +- 2 files changed, 27 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/db516524/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala -- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 627148d..455a6b9 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -31,24 +31,39 @@ object Main extends Logging { val tmp = System.getProperty("java.io.tmpdir") val rootDir = conf.get("spark.repl.classdir", tmp) val outputDir = Utils.createTempDir(rootDir) - val s = new Settings() - s.processArguments(List("-Yrepl-class-based", -"-Yrepl-outdir", s"${outputDir.getAbsolutePath}", -"-classpath", getAddedJars.mkString(File.pathSeparator)), true) // the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set if needed lazy val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf)) var sparkContext: SparkContext = _ var sqlContext: SQLContext = _ var interp = new SparkILoop // this is a public var because tests reset it. + private var hasErrors = false + + private def scalaOptionError(msg: String): Unit = { +hasErrors = true +Console.err.println(msg) + } + def main(args: Array[String]) { -if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") -// Start the classServer and store its URI in a spark system property -// (which will be passed to executors so that they can connect to it) -classServer.start() -interp.process(s) // Repl starts and goes in loop of R.E.P.L -classServer.stop() -Option(sparkContext).map(_.stop) + +val interpArguments = List( + "-Yrepl-class-based", + "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", + "-classpath", getAddedJars.mkString(File.pathSeparator) +) ++ args.toList + +val settings = new Settings(scalaOptionError) +settings.processArguments(interpArguments, true) + +if (!hasErrors) { + if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") + // Start the classServer and store its URI in a spark system property + // (which will be passed to executors so that they can connect to it) + classServer.start() + interp.process(settings) // Repl starts and goes in loop of R.E.P.L + classServer.stop() + Option(sparkContext).map(_.stop) +} } def getAddedJars: Array[String] = { http://git-wip-us.apache.org/repos/asf/spark/blob/db516524/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala -- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index bf89979..63f3688 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -54,8 +54,7 @@ class ReplSuite extends SparkFunSuite { new SparkILoop(in, new PrintWriter(out)) } org.apache.spark.repl.Main.interp = interp -Main.s.processArguments(List("-classpath", classpath), true) -Main.main(Array()) // call main +Main.main(Array("-classpath", classpath)) // call main org.apache.spark.repl.Main.interp = null if (oldExecutorClasspath != null) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org