spark git commit: [SPARK-16848][SQL] Check schema validation for user-specified schema in jdbc and table APIs
Repository: spark Updated Branches: refs/heads/master 43fa21b3e -> 24100f162 [SPARK-16848][SQL] Check schema validation for user-specified schema in jdbc and table APIs ## What changes were proposed in this pull request? This PR proposes to throw an exception for both jdbc APIs when user specified schemas are not allowed or useless. **DataFrameReader.jdbc(...)** ``` scala spark.read.schema(StructType(Nil)).jdbc(...) ``` **DataFrameReader.table(...)** ```scala spark.read.schema(StructType(Nil)).table("usrdb.test") ``` ## How was this patch tested? Unit test in `JDBCSuite` and `DataFrameReaderWriterSuite`. Author: hyukjinkwon Closes #14451 from HyukjinKwon/SPARK-16848. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24100f16 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24100f16 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24100f16 Branch: refs/heads/master Commit: 24100f162dadb80400cb3e0bc94e4282f10f0c84 Parents: 43fa21b Author: hyukjinkwon Authored: Wed Jan 11 21:03:48 2017 -0800 Committer: gatorsmile Committed: Wed Jan 11 21:03:48 2017 -0800 -- .../org/apache/spark/sql/DataFrameReader.scala | 14 -- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 17 - .../sql/test/DataFrameReaderWriterSuite.scala | 10 ++ 3 files changed, 38 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/24100f16/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index cd83836..fe34d59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -165,6 +165,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def jdbc(url: String, table: String, properties: Properties): DataFrame = { +assertNoSpecifiedSchema("jdbc") // properties should override settings in extraOptions. this.extraOptions ++= properties.asScala // explicit url and dbtable should override all @@ -235,6 +236,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { table: String, predicates: Array[String], connectionProperties: Properties): DataFrame = { +assertNoSpecifiedSchema("jdbc") // connectionProperties should override settings in extraOptions. val params = extraOptions.toMap ++ connectionProperties.asScala.toMap val options = new JDBCOptions(url, table, params) @@ -475,6 +477,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def table(tableName: String): DataFrame = { +assertNoSpecifiedSchema("table") sparkSession.table(tableName) } @@ -540,10 +543,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def textFile(paths: String*): Dataset[String] = { +assertNoSpecifiedSchema("textFile") +text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder) + } + + /** + * A convenient function for schema validation in APIs. + */ + private def assertNoSpecifiedSchema(operation: String): Unit = { if (userSpecifiedSchema.nonEmpty) { - throw new AnalysisException("User specified schema not supported with `textFile`") + throw new AnalysisException(s"User specified schema not supported with `$operation`") } -text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder) } /// http://git-wip-us.apache.org/repos/asf/spark/blob/24100f16/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 74ca66b..0396254 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -25,7 +25,7 @@ import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution
spark git commit: [SPARK-19133][SPARKR][ML][BACKPORT-2.0] fix glm for Gamma, clarify glm family supported
Repository: spark Updated Branches: refs/heads/branch-2.0 6fe676c09 -> ec2fe925c [SPARK-19133][SPARKR][ML][BACKPORT-2.0] fix glm for Gamma, clarify glm family supported ## What changes were proposed in this pull request? Backport to 2.0 (cherry picking from 2.1 didn't work) ## How was this patch tested? unit test Author: Felix Cheung Closes #16543 from felixcheung/rgammabackport20. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec2fe925 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec2fe925 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec2fe925 Branch: refs/heads/branch-2.0 Commit: ec2fe925cd359ca5c132372d4b18ff791b70605a Parents: 6fe676c Author: Felix Cheung Authored: Wed Jan 11 20:01:11 2017 -0800 Committer: Felix Cheung Committed: Wed Jan 11 20:01:11 2017 -0800 -- R/pkg/R/mllib.R| 7 ++- R/pkg/inst/tests/testthat/test_mllib.R | 8 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ec2fe925/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index b33a16a..cd07f27 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -89,6 +89,8 @@ NULL #' This can be a character string naming a family function, a family function or #' the result of a call to a family function. Refer R family at #' \url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}. +#' Currently these families are supported: \code{binomial}, \code{gaussian}, +#' \code{Gamma}, and \code{poisson}. #' @param tol positive convergence tolerance of iterations. #' @param maxIter integer giving the maximal number of IRLS iterations. #' @param ... additional arguments passed to the method. @@ -134,8 +136,9 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), formula <- paste(deparse(formula), collapse = "") +# For known families, Gamma is upper-cased jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper", -"fit", formula, data@sdf, family$family, family$link, +"fit", formula, data@sdf, tolower(family$family), family$link, tol, as.integer(maxIter)) return(new("GeneralizedLinearRegressionModel", jobj = jobj)) }) @@ -150,6 +153,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), #' This can be a character string naming a family function, a family function or #' the result of a call to a family function. Refer R family at #' \url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}. +#' Currently these families are supported: \code{binomial}, \code{gaussian}, +#' \code{Gamma}, and \code{poisson}. #' @param epsilon positive convergence tolerance of iterations. #' @param maxit integer giving the maximal number of IRLS iterations. #' @return \code{glm} returns a fitted generalized linear model. http://git-wip-us.apache.org/repos/asf/spark/blob/ec2fe925/R/pkg/inst/tests/testthat/test_mllib.R -- diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 753da81..e0d2e53 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -69,6 +69,14 @@ test_that("spark.glm and predict", { data = iris, family = poisson(link = identity)), iris)) expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) + # Gamma family + x <- runif(100, -1, 1) + y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10) + df <- as.DataFrame(as.data.frame(list(x = x, y = y))) + model <- glm(y ~ x, family = Gamma, df) + out <- capture.output(print(summary(model))) + expect_true(any(grepl("Dispersion parameter for gamma family", out))) + # Test stats::predict is working x <- rnorm(15) y <- x + rnorm(15) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19132][SQL] Add test cases for row size estimation and aggregate estimation
Repository: spark Updated Branches: refs/heads/master 66fe819ad -> 43fa21b3e [SPARK-19132][SQL] Add test cases for row size estimation and aggregate estimation ## What changes were proposed in this pull request? In this pr, we add more test cases for project and aggregate estimation. ## How was this patch tested? Add test cases. Author: wangzhenhua Closes #16551 from wzhfy/addTests. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43fa21b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43fa21b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43fa21b3 Branch: refs/heads/master Commit: 43fa21b3e62ee108bcecb74398f431f08c6b625c Parents: 66fe819 Author: wangzhenhua Authored: Wed Jan 11 15:00:58 2017 -0800 Committer: Reynold Xin Committed: Wed Jan 11 15:00:58 2017 -0800 -- .../statsEstimation/AggregateEstimation.scala | 14 +- .../statsEstimation/EstimationUtils.scala | 11 +- .../statsEstimation/ProjectEstimation.scala | 2 +- .../statsEstimation/AggEstimationSuite.scala| 135 --- .../AggregateEstimationSuite.scala | 116 .../ProjectEstimationSuite.scala| 119 +--- .../StatsEstimationTestBase.scala | 11 +- 7 files changed, 248 insertions(+), 160 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/43fa21b3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala index af67343..21e94fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala @@ -41,13 +41,19 @@ object AggregateEstimation { var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))( (res, expr) => res * childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount) - // Here we set another upper bound for the number of output rows: it must not be larger than - // child's number of rows. - outputRows = outputRows.min(childStats.rowCount.get) + outputRows = if (agg.groupingExpressions.isEmpty) { +// If there's no group-by columns, the output is a single row containing values of aggregate +// functions: aggregated results for non-empty input or initial values for empty input. +1 + } else { +// Here we set another upper bound for the number of output rows: it must not be larger than +// child's number of rows. +outputRows.min(childStats.rowCount.get) + } val outputAttrStats = getOutputMap(childStats.attributeStats, agg.output) Some(Statistics( -sizeInBytes = outputRows * getRowSize(agg.output, outputAttrStats), +sizeInBytes = getOutputSize(agg.output, outputAttrStats, outputRows), rowCount = Some(outputRows), attributeStats = outputAttrStats, isBroadcastable = childStats.isBroadcastable)) http://git-wip-us.apache.org/repos/asf/spark/blob/43fa21b3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala index c7eb6f0..cf4452d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala @@ -35,10 +35,13 @@ object EstimationUtils { AttributeMap(output.flatMap(a => inputMap.get(a).map(a -> _))) } - def getRowSize(attributes: Seq[Attribute], attrStats: AttributeMap[ColumnStat]): Long = { + def getOutputSize( + attributes: Seq[Attribute], + attrStats: AttributeMap[ColumnStat], + outputRowCount: BigInt): BigInt = { // We assign a generic overhead for a Row object, the actual overhead is different for different // Row format. -8 + attributes.map { attr => +val sizePerRow = 8 + attributes.map { attr => if (attrStats.contains(attr)) { attr.dataType match { case Stri
spark git commit: [SPARK-19149][SQL] Follow-up: simplify cache implementation.
Repository: spark Updated Branches: refs/heads/master 30a07071f -> 66fe819ad [SPARK-19149][SQL] Follow-up: simplify cache implementation. ## What changes were proposed in this pull request? This patch simplifies slightly the logical plan statistics cache implementation, as discussed in https://github.com/apache/spark/pull/16529 ## How was this patch tested? N/A - this has no behavior change. Author: Reynold Xin Closes #16544 from rxin/SPARK-19149. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66fe819a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66fe819a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66fe819a Branch: refs/heads/master Commit: 66fe819ada6435f3a351c2d257e73b8e6f6085cd Parents: 30a0707 Author: Reynold Xin Authored: Wed Jan 11 14:25:36 2017 -0800 Committer: Reynold Xin Committed: Wed Jan 11 14:25:36 2017 -0800 -- .../catalyst/plans/logical/LogicalPlan.scala| 21 1 file changed, 13 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/66fe819a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 9e5ba9c..0587a59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -82,17 +82,22 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { } /** A cache for the estimated statistics, such that it will only be computed once. */ - private val statsCache = new ThreadLocal[Option[Statistics]] { -override protected def initialValue: Option[Statistics] = None - } + private var statsCache: Option[Statistics] = None - def stats(conf: CatalystConf): Statistics = statsCache.get.getOrElse { -statsCache.set(Some(computeStats(conf))) -statsCache.get.get + /** + * Returns the estimated statistics for the current logical plan node. Under the hood, this + * method caches the return value, which is computed based on the configuration passed in the + * first time. If the configuration changes, the cache can be invalidated by calling + * [[invalidateStatsCache()]]. + */ + final def stats(conf: CatalystConf): Statistics = statsCache.getOrElse { +statsCache = Some(computeStats(conf)) +statsCache.get } - def invalidateStatsCache(): Unit = { -statsCache.set(None) + /** Invalidates the stats cache. See [[stats]] for more information. */ + final def invalidateStatsCache(): Unit = { +statsCache = None children.foreach(_.invalidateStatsCache()) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18801][SQL] Support resolve a nested view
Repository: spark Updated Branches: refs/heads/master 3bc2eff88 -> 30a07071f [SPARK-18801][SQL] Support resolve a nested view ## What changes were proposed in this pull request? We should be able to resolve a nested view. The main advantage is that if you update an underlying view, the current view also gets updated. The new approach should be compatible with older versions of SPARK/HIVE, that means: 1. The new approach should be able to resolve the views that created by older versions of SPARK/HIVE; 2. The new approach should be able to resolve the views that are currently supported by SPARK SQL. The new approach mainly brings in the following changes: 1. Add a new operator called `View` to keep track of the CatalogTable that describes the view, and the output attributes as well as the child of the view; 2. Update the `ResolveRelations` rule to resolve the relations and views, note that a nested view should be resolved correctly; 3. Add `viewDefaultDatabase` variable to `CatalogTable` to keep track of the default database name used to resolve a view, if the `CatalogTable` is not a view, then the variable should be `None`; 4. Add `AnalysisContext` to enable us to still support a view created with CTE/Windows query; 5. Enables the view support without enabling Hive support (i.e., enableHiveSupport); 6. Fix a weird behavior: the result of a view query may have different schema if the referenced table has been changed. After this PR, we try to cast the child output attributes to that from the view schema, throw an AnalysisException if cast is not allowed. Note this is compatible with the views defined by older versions of Spark(before 2.2), which have empty `defaultDatabase` and all the relations in `viewText` have database part defined. ## How was this patch tested? 1. Add new tests in `SessionCatalogSuite` to test the function `lookupRelation`; 2. Add new test case in `SQLViewSuite` to test resolve a nested view. Author: jiangxingbo Closes #16233 from jiangxb1987/resolve-view. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30a07071 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30a07071 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30a07071 Branch: refs/heads/master Commit: 30a07071f099c0ebcf04c4df61f8d414dcbad7b5 Parents: 3bc2eff Author: jiangxingbo Authored: Wed Jan 11 13:44:07 2017 -0800 Committer: Herman van Hovell Committed: Wed Jan 11 13:44:07 2017 -0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 158 +++ .../spark/sql/catalyst/analysis/view.scala | 80 ++ .../sql/catalyst/catalog/SessionCatalog.scala | 31 +++- .../spark/sql/catalyst/catalog/interface.scala | 9 ++ .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../plans/logical/basicLogicalOperators.scala | 30 .../catalyst/catalog/ExternalCatalogSuite.scala | 30 +++- .../catalyst/catalog/SessionCatalogSuite.scala | 36 +++-- .../apache/spark/sql/catalyst/SQLBuilder.scala | 3 + .../spark/sql/internal/SessionState.scala | 3 +- .../org/apache/spark/sql/SQLQuerySuite.scala| 10 ++ .../spark/sql/hive/HiveExternalCatalog.scala| 4 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 20 ++- .../spark/sql/hive/HiveSessionCatalog.scala | 12 +- .../spark/sql/hive/HiveSessionState.scala | 3 +- .../spark/sql/hive/execution/SQLViewSuite.scala | 155 ++ 16 files changed, 516 insertions(+), 69 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 73e9206..d461531 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -21,8 +21,8 @@ import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -50,6 +50,39 @@ object SimpleAn
spark git commit: [SPARK-17568][CORE][DEPLOY] Add spark-submit option to override ivy settings used to resolve packages/artifacts
Repository: spark Updated Branches: refs/heads/master d749c0667 -> 3bc2eff88 [SPARK-17568][CORE][DEPLOY] Add spark-submit option to override ivy settings used to resolve packages/artifacts ## What changes were proposed in this pull request? Adding option in spark-submit to allow overriding the default IvySettings used to resolve artifacts as part of the Spark Packages functionality. This will allow all artifact resolution to go through a central managed repository, such as Nexus or Artifactory, where site admins can better approve and control what is used with Spark apps. This change restructures the creation of the IvySettings object in two distinct ways. First, if the `spark.ivy.settings` option is not defined then `buildIvySettings` will create a default settings instance, as before, with defined repositories (Maven Central) included. Second, if the option is defined, the ivy settings file will be loaded from the given path and only repositories defined within will be used for artifact resolution. ## How was this patch tested? Existing tests for default behaviour, Manual tests that load a ivysettings.xml file with local and Nexus repositories defined. Added new test to load a simple Ivy settings file with a local filesystem resolver. Author: Bryan Cutler Author: Ian Hummel Closes #15119 from BryanCutler/spark-custom-IvySettings. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bc2eff8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bc2eff8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bc2eff8 Branch: refs/heads/master Commit: 3bc2eff8880a3ba8d4318118715ea1a47048e3de Parents: d749c06 Author: Bryan Cutler Authored: Wed Jan 11 11:57:38 2017 -0800 Committer: Marcelo Vanzin Committed: Wed Jan 11 11:57:38 2017 -0800 -- .../org/apache/spark/deploy/SparkSubmit.scala | 149 +-- .../spark/deploy/SparkSubmitUtilsSuite.scala| 100 ++--- docs/configuration.md | 26 +++- .../sql/hive/client/IsolatedClientLoader.scala | 5 +- 4 files changed, 206 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3bc2eff8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 85f80b6..a980144 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -17,10 +17,11 @@ package org.apache.spark.deploy -import java.io.{File, PrintStream} +import java.io.{File, IOException, PrintStream} import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL import java.security.PrivilegedExceptionAction +import java.text.ParseException import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap, Map} @@ -283,8 +284,17 @@ object SparkSubmit extends CommandLineUtils { } else { Nil } + +// Create the IvySettings, either load from file or build defaults +val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile => + SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories), +Option(args.ivyRepoPath)) +}.getOrElse { + SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath)) +} + val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages, - Option(args.repositories), Option(args.ivyRepoPath), exclusions = exclusions) + ivySettings, exclusions = exclusions) if (!StringUtils.isBlank(resolvedMavenCoordinates)) { args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) if (args.isPython) { @@ -860,30 +870,13 @@ private[spark] object SparkSubmitUtils { /** * Extracts maven coordinates from a comma-delimited string - * @param remoteRepos Comma-delimited string of remote repositories - * @param ivySettings The Ivy settings for this session + * @param defaultIvyUserDir The default user path for Ivy * @return A ChainResolver used by Ivy to search for and resolve dependencies. */ - def createRepoResolvers(remoteRepos: Option[String], ivySettings: IvySettings): ChainResolver = { + def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = { // We need a chain resolver if we want to check multiple repositories val cr = new ChainResolver -cr.setName("list") - -val repositoryList = remoteRepos.getOrElse("") -// add any other remote repositories other than maven ce
spark git commit: [SPARK-19130][SPARKR] Support setting literal value as column implicitly
Repository: spark Updated Branches: refs/heads/branch-2.1 1022049c7 -> 82fcc1330 [SPARK-19130][SPARKR] Support setting literal value as column implicitly ## What changes were proposed in this pull request? ``` df$foo <- 1 ``` instead of ``` df$foo <- lit(1) ``` ## How was this patch tested? unit tests Author: Felix Cheung Closes #16510 from felixcheung/rlitcol. (cherry picked from commit d749c06677c2fd38377f1c00f542da122b8d) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82fcc133 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82fcc133 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82fcc133 Branch: refs/heads/branch-2.1 Commit: 82fcc133040cb5ef32f10df73fcb9fd8914aa9c1 Parents: 1022049 Author: Felix Cheung Authored: Wed Jan 11 08:29:09 2017 -0800 Committer: Shivaram Venkataraman Committed: Wed Jan 11 08:29:30 2017 -0800 -- R/pkg/R/DataFrame.R | 22 +- R/pkg/R/utils.R | 4 R/pkg/inst/tests/testthat/test_sparkSQL.R | 18 ++ 3 files changed, 39 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/82fcc133/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 058a77e..c79b1d3 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1721,14 +1721,21 @@ setMethod("$", signature(x = "SparkDataFrame"), getColumn(x, name) }) -#' @param value a Column or \code{NULL}. If \code{NULL}, the specified Column is dropped. +#' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}. +#' If \code{NULL}, the specified Column is dropped. #' @rdname select #' @name $<- #' @aliases $<-,SparkDataFrame-method #' @note $<- since 1.4.0 setMethod("$<-", signature(x = "SparkDataFrame"), function(x, name, value) { -stopifnot(class(value) == "Column" || is.null(value)) +if (class(value) != "Column" && !is.null(value)) { + if (isAtomicLengthOne(value)) { +value <- lit(value) + } else { +stop("value must be a Column, literal value as atomic in length of 1, or NULL") + } +} if (is.null(value)) { nx <- drop(x, name) @@ -1941,10 +1948,10 @@ setMethod("selectExpr", #' #' @param x a SparkDataFrame. #' @param colName a column name. -#' @param col a Column expression. +#' @param col a Column expression, or an atomic vector in the length of 1 as literal value. #' @return A SparkDataFrame with the new column added or the existing column replaced. #' @family SparkDataFrame functions -#' @aliases withColumn,SparkDataFrame,character,Column-method +#' @aliases withColumn,SparkDataFrame,character-method #' @rdname withColumn #' @name withColumn #' @seealso \link{rename} \link{mutate} @@ -1957,11 +1964,16 @@ setMethod("selectExpr", #' newDF <- withColumn(df, "newCol", df$col1 * 5) #' # Replace an existing column #' newDF2 <- withColumn(newDF, "newCol", newDF$col1) +#' newDF3 <- withColumn(newDF, "newCol", 42) #' } #' @note withColumn since 1.4.0 setMethod("withColumn", - signature(x = "SparkDataFrame", colName = "character", col = "Column"), + signature(x = "SparkDataFrame", colName = "character"), function(x, colName, col) { +if (class(col) != "Column") { + if (!isAtomicLengthOne(col)) stop("Literal value must be atomic in length of 1") + col <- lit(col) +} sdf <- callJMethod(x@sdf, "withColumn", colName, col@jc) dataFrame(sdf) }) http://git-wip-us.apache.org/repos/asf/spark/blob/82fcc133/R/pkg/R/utils.R -- diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 1283449..74b3e50 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -863,3 +863,7 @@ basenameSansExtFromUrl <- function(url) { # then, strip extension by the last '.' sub("([^.]+)\\.[[:alnum:]]+$", "\\1", filename) } + +isAtomicLengthOne <- function(x) { + is.atomic(x) && length(x) == 1 +} http://git-wip-us.apache.org/repos/asf/spark/blob/82fcc133/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 4490f31..0be924f 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1001,6 +1001,17 @@ test_that("select operators", { expect_equal(column
spark git commit: [SPARK-19130][SPARKR] Support setting literal value as column implicitly
Repository: spark Updated Branches: refs/heads/master 4239a1081 -> d749c0667 [SPARK-19130][SPARKR] Support setting literal value as column implicitly ## What changes were proposed in this pull request? ``` df$foo <- 1 ``` instead of ``` df$foo <- lit(1) ``` ## How was this patch tested? unit tests Author: Felix Cheung Closes #16510 from felixcheung/rlitcol. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d749c066 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d749c066 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d749c066 Branch: refs/heads/master Commit: d749c06677c2fd38377f1c00f542da122b8d Parents: 4239a10 Author: Felix Cheung Authored: Wed Jan 11 08:29:09 2017 -0800 Committer: Shivaram Venkataraman Committed: Wed Jan 11 08:29:09 2017 -0800 -- R/pkg/R/DataFrame.R | 22 +- R/pkg/R/utils.R | 4 R/pkg/inst/tests/testthat/test_sparkSQL.R | 18 ++ 3 files changed, 39 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d749c066/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index c56648a..3d912c9 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1727,14 +1727,21 @@ setMethod("$", signature(x = "SparkDataFrame"), getColumn(x, name) }) -#' @param value a Column or \code{NULL}. If \code{NULL}, the specified Column is dropped. +#' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}. +#' If \code{NULL}, the specified Column is dropped. #' @rdname select #' @name $<- #' @aliases $<-,SparkDataFrame-method #' @note $<- since 1.4.0 setMethod("$<-", signature(x = "SparkDataFrame"), function(x, name, value) { -stopifnot(class(value) == "Column" || is.null(value)) +if (class(value) != "Column" && !is.null(value)) { + if (isAtomicLengthOne(value)) { +value <- lit(value) + } else { +stop("value must be a Column, literal value as atomic in length of 1, or NULL") + } +} if (is.null(value)) { nx <- drop(x, name) @@ -1947,10 +1954,10 @@ setMethod("selectExpr", #' #' @param x a SparkDataFrame. #' @param colName a column name. -#' @param col a Column expression. +#' @param col a Column expression, or an atomic vector in the length of 1 as literal value. #' @return A SparkDataFrame with the new column added or the existing column replaced. #' @family SparkDataFrame functions -#' @aliases withColumn,SparkDataFrame,character,Column-method +#' @aliases withColumn,SparkDataFrame,character-method #' @rdname withColumn #' @name withColumn #' @seealso \link{rename} \link{mutate} @@ -1963,11 +1970,16 @@ setMethod("selectExpr", #' newDF <- withColumn(df, "newCol", df$col1 * 5) #' # Replace an existing column #' newDF2 <- withColumn(newDF, "newCol", newDF$col1) +#' newDF3 <- withColumn(newDF, "newCol", 42) #' } #' @note withColumn since 1.4.0 setMethod("withColumn", - signature(x = "SparkDataFrame", colName = "character", col = "Column"), + signature(x = "SparkDataFrame", colName = "character"), function(x, colName, col) { +if (class(col) != "Column") { + if (!isAtomicLengthOne(col)) stop("Literal value must be atomic in length of 1") + col <- lit(col) +} sdf <- callJMethod(x@sdf, "withColumn", colName, col@jc) dataFrame(sdf) }) http://git-wip-us.apache.org/repos/asf/spark/blob/d749c066/R/pkg/R/utils.R -- diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 1283449..74b3e50 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -863,3 +863,7 @@ basenameSansExtFromUrl <- function(url) { # then, strip extension by the last '.' sub("([^.]+)\\.[[:alnum:]]+$", "\\1", filename) } + +isAtomicLengthOne <- function(x) { + is.atomic(x) && length(x) == 1 +} http://git-wip-us.apache.org/repos/asf/spark/blob/d749c066/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index c3f0310..3e8b96a 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1001,6 +1001,17 @@ test_that("select operators", { expect_equal(columns(df), c("name", "age", "age2")) expect_equal(count(where(df, df$age2 == df$age * 2)), 2) + df$age2 <- 21 + e
spark-website git commit: First Java example does not work with recent Spark version (see https://issues.apache.org/jira/browse/SPARK-19156)
Repository: spark-website Updated Branches: refs/heads/asf-site 46a7a8027 -> e95223137 First Java example does not work with recent Spark version (see https://issues.apache.org/jira/browse/SPARK-19156) Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/e9522313 Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/e9522313 Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/e9522313 Branch: refs/heads/asf-site Commit: e9522313769f23470b35b38d3a1179c5f5dc3fdc Parents: 46a7a80 Author: Rafael Guglielmetti Authored: Wed Jan 11 10:21:06 2017 +0100 Committer: Sean Owen Committed: Wed Jan 11 16:16:53 2017 + -- examples.md| 4 ++-- site/examples.html | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/e9522313/examples.md -- diff --git a/examples.md b/examples.md index b4ac734..7f13e41 100644 --- a/examples.md +++ b/examples.md @@ -62,7 +62,7 @@ counts.saveAsTextFile("hdfs://...") {% highlight java %} JavaRDD textFile = sc.textFile("hdfs://..."); JavaRDD words = textFile.flatMap(new FlatMapFunction() { - public Iterable call(String s) { return Arrays.asList(s.split(" ")); } + public Iterator call(String s) { return Arrays.asList(s.split(" ")).iterator(); } }); JavaPairRDD pairs = words.mapToPair(new PairFunction() { public Tuple2 call(String s) { return new Tuple2(s, 1); } @@ -413,4 +413,4 @@ model.transform(df).show(); Many additional examples are distributed with Spark: * Basic Spark: [Scala examples](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples), [Java examples](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples), [Python examples](https://github.com/apache/spark/tree/master/examples/src/main/python) - * Spark Streaming: [Scala examples](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming), [Java examples](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming) \ No newline at end of file + * Spark Streaming: [Scala examples](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming), [Java examples](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming) http://git-wip-us.apache.org/repos/asf/spark-website/blob/e9522313/site/examples.html -- diff --git a/site/examples.html b/site/examples.html index ebaec55..bfff52d 100644 --- a/site/examples.html +++ b/site/examples.html @@ -248,7 +248,7 @@ In this page, we will show examples using RDD API as well as examples using high JavaRDDtextFile = sc.textFile("hdfs://..."); JavaRDD words = textFile.flatMap(new FlatMapFunction () { - public Iterable call(String s) { return Arrays.asList(s.split(" ")); } + public Iterator call(String s) { return Arrays.asList(s.split(" ")).iterator(); } }); JavaPairRDD pairs = words.mapToPair(new PairFunction () { public Tuple2 call(String s) { return new Tuple2 (s, 1); } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19021][YARN] Generailize HDFSCredentialProvider to support non HDFS security filesystems
Repository: spark Updated Branches: refs/heads/master a61551356 -> 4239a1081 [SPARK-19021][YARN] Generailize HDFSCredentialProvider to support non HDFS security filesystems Currently Spark can only get token renewal interval from security HDFS (hdfs://), if Spark runs with other security file systems like webHDFS (webhdfs://), wasb (wasb://), ADLS, it will ignore these tokens and not get token renewal intervals from these tokens. These will make Spark unable to work with these security clusters. So instead of only checking HDFS token, we should generalize to support different DelegationTokenIdentifier. ## How was this patch tested? Manually verified in security cluster. Author: jerryshao Closes #16432 from jerryshao/SPARK-19021. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4239a108 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4239a108 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4239a108 Branch: refs/heads/master Commit: 4239a1081ad96a503fbf9277e42b97422bb8af3e Parents: a615513 Author: jerryshao Authored: Wed Jan 11 09:24:02 2017 -0600 Committer: Tom Graves Committed: Wed Jan 11 09:24:02 2017 -0600 -- docs/running-on-yarn.md | 12 +- ...ploy.yarn.security.ServiceCredentialProvider | 2 +- .../ConfigurableCredentialManager.scala | 2 +- .../yarn/security/HDFSCredentialProvider.scala | 111 - .../security/HadoopFSCredentialProvider.scala | 120 +++ .../ConfigurableCredentialManagerSuite.scala| 8 +- .../security/HDFSCredentialProviderSuite.scala | 71 --- .../HadoopFSCredentialProviderSuite.scala | 71 +++ 8 files changed, 203 insertions(+), 194 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4239a108/docs/running-on-yarn.md -- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index a072975..f751345 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -479,12 +479,12 @@ Hadoop services issue *hadoop tokens* to grant access to the services and data. Clients must first acquire tokens for the services they will access and pass them along with their application as it is launched in the YARN cluster. -For a Spark application to interact with HDFS, HBase and Hive, it must acquire the relevant tokens +For a Spark application to interact with any of the Hadoop filesystem (for example hdfs, webhdfs, etc), HBase and Hive, it must acquire the relevant tokens using the Kerberos credentials of the user launching the application âthat is, the principal whose identity will become that of the launched Spark application. This is normally done at launch time: in a secure cluster Spark will automatically obtain a -token for the cluster's HDFS filesystem, and potentially for HBase and Hive. +token for the cluster's default Hadoop filesystem, and potentially for HBase and Hive. An HBase token will be obtained if HBase is in on classpath, the HBase configuration declares the application is secure (i.e. `hbase-site.xml` sets `hbase.security.authentication` to `kerberos`), @@ -494,12 +494,12 @@ Similarly, a Hive token will be obtained if Hive is on the classpath, its config includes a URI of the metadata store in `"hive.metastore.uris`, and `spark.yarn.security.credentials.hive.enabled` is not set to `false`. -If an application needs to interact with other secure HDFS clusters, then +If an application needs to interact with other secure Hadoop filesystems, then the tokens needed to access these clusters must be explicitly requested at launch time. This is done by listing them in the `spark.yarn.access.namenodes` property. ``` -spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/ +spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,webhdfs://frankfurt.example.org:50070/ ``` Spark supports integrating with other security-aware services through Java Services mechanism (see @@ -558,8 +558,8 @@ For Spark applications, the Oozie workflow must be set up for Oozie to request a the application needs, including: - The YARN resource manager. -- The local HDFS filesystem. -- Any remote HDFS filesystems used as a source or destination of I/O. +- The local Hadoop filesystem. +- Any remote Hadoop filesystems used as a source or destination of I/O. - Hive âif used. - HBase âif used. - The YARN timeline server, if the application interacts with this. http://git-wip-us.apache.org/repos/asf/spark/blob/4239a108/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider ---