spark git commit: [SPARK-21640][FOLLOW-UP][SQL] added errorifexists on IllegalArgumentException message
Repository: spark Updated Branches: refs/heads/master f763d8464 -> 312bebfb6 [SPARK-21640][FOLLOW-UP][SQL] added errorifexists on IllegalArgumentException message ## What changes were proposed in this pull request? This commit adds a new argument for IllegalArgumentException message. This recent commit added the argument: [https://github.com/apache/spark/commit/dcac1d57f0fd05605edf596c303546d83062a352](https://github.com/apache/spark/commit/dcac1d57f0fd05605edf596c303546d83062a352) ## How was this patch tested? Unit test have been passed Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Marcos P. SanchezCloses #18862 from mpenate/feature/exception-errorifexists. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/312bebfb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/312bebfb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/312bebfb Branch: refs/heads/master Commit: 312bebfb6d9e3fc8d48d3c1f7509ba05059bd8b0 Parents: f763d84 Author: Marcos P. Sanchez Authored: Mon Aug 7 22:41:57 2017 -0700 Committer: gatorsmile Committed: Mon Aug 7 22:41:57 2017 -0700 -- docs/sql-programming-guide.md | 2 +- sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/312bebfb/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 7f7cf59..2ac2383 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -514,7 +514,7 @@ new data. Scala/JavaAny LanguageMeaning SaveMode.ErrorIfExists (default) - "error" (default) + "error" or "errorifexists" (default) When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. http://git-wip-us.apache.org/repos/asf/spark/blob/312bebfb/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 079f699..65c9ef4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -73,7 +73,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { case "ignore" => SaveMode.Ignore case "error" | "errorifexists" | "default" => SaveMode.ErrorIfExists case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + -"Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.") +"Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.") } this } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21306][ML] For branch 2.0, OneVsRest should support setWeightCol
Repository: spark Updated Branches: refs/heads/branch-2.0 c27a01aec -> 9f670ce5d [SPARK-21306][ML] For branch 2.0, OneVsRest should support setWeightCol The PR is related to #18554, and is modified for branch 2.0. ## What changes were proposed in this pull request? add `setWeightCol` method for OneVsRest. `weightCol` is ignored if classifier doesn't inherit HasWeightCol trait. ## How was this patch tested? + [x] add an unit test. Author: Yan Facai (é¢åæ)Closes #18764 from facaiy/BUG/branch-2.0_OneVsRest_support_setWeightCol. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f670ce5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f670ce5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f670ce5 Branch: refs/heads/branch-2.0 Commit: 9f670ce5d1aeef737226185d78f07147f0cc2693 Parents: c27a01a Author: Yan Facai (é¢åæ) Authored: Tue Aug 8 11:18:15 2017 +0800 Committer: Yanbo Liang Committed: Tue Aug 8 11:18:15 2017 +0800 -- .../spark/ml/classification/OneVsRest.scala | 39 ++-- .../ml/classification/OneVsRestSuite.scala | 11 ++ python/pyspark/ml/classification.py | 27 +++--- python/pyspark/ml/tests.py | 14 +++ 4 files changed, 82 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9f670ce5/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index f4ab0a0..770d5db 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -34,6 +34,7 @@ import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} +import org.apache.spark.ml.param.shared.HasWeightCol import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ @@ -53,7 +54,8 @@ private[ml] trait ClassifierTypeTrait { /** * Params for [[OneVsRest]]. */ -private[ml] trait OneVsRestParams extends PredictorParams with ClassifierTypeTrait { +private[ml] trait OneVsRestParams extends PredictorParams + with ClassifierTypeTrait with HasWeightCol { /** * param for the base binary classifier that we reduce multiclass classification into. @@ -290,6 +292,18 @@ final class OneVsRest @Since("1.4.0") ( @Since("1.5.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) + /** + * Sets the value of param [[weightCol]]. + * + * This is ignored if weight is not supported by [[classifier]]. + * If this is not set or empty, we treat all instance weights as 1.0. + * Default is not set, so all instances have weight one. + * + * @group setParam + */ + @Since("2.3.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + @Since("1.4.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema, fitting = true, getClassifier.featuresDataType) @@ -308,7 +322,20 @@ final class OneVsRest @Since("1.4.0") ( } val numClasses = MetadataUtils.getNumClasses(labelSchema).fold(computeNumClasses())(identity) -val multiclassLabeled = dataset.select($(labelCol), $(featuresCol)) +val weightColIsUsed = isDefined(weightCol) && $(weightCol).nonEmpty && { + getClassifier match { +case _: HasWeightCol => true +case c => + logWarning(s"weightCol is ignored, as it is not supported by $c now.") + false + } +} + +val multiclassLabeled = if (weightColIsUsed) { + dataset.select($(labelCol), $(featuresCol), $(weightCol)) +} else { + dataset.select($(labelCol), $(featuresCol)) +} // persist if underlying dataset is not persistent. val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE @@ -328,7 +355,13 @@ final class OneVsRest @Since("1.4.0") ( paramMap.put(classifier.labelCol -> labelColName) paramMap.put(classifier.featuresCol -> getFeaturesCol) paramMap.put(classifier.predictionCol -> getPredictionCol) - classifier.fit(trainingDataset, paramMap) + if (weightColIsUsed) { +val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol] +paramMap.put(classifier_.weightCol -> getWeightCol) +classifier_.fit(trainingDataset, paramMap) + }
spark git commit: [SPARK-21306][ML] For branch 2.1, OneVsRest should support setWeightCol
Repository: spark Updated Branches: refs/heads/branch-2.1 444cca14d -> 9b749b6ce [SPARK-21306][ML] For branch 2.1, OneVsRest should support setWeightCol The PR is related to #18554, and is modified for branch 2.1. ## What changes were proposed in this pull request? add `setWeightCol` method for OneVsRest. `weightCol` is ignored if classifier doesn't inherit HasWeightCol trait. ## How was this patch tested? + [x] add an unit test. Author: Yan Facai (é¢åæ)Closes #18763 from facaiy/BUG/branch-2.1_OneVsRest_support_setWeightCol. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b749b6c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b749b6c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b749b6c Branch: refs/heads/branch-2.1 Commit: 9b749b6ce6b86caf8a73d6993490fc140b9ad282 Parents: 444cca1 Author: Yan Facai (é¢åæ) Authored: Tue Aug 8 11:05:36 2017 +0800 Committer: Yanbo Liang Committed: Tue Aug 8 11:05:36 2017 +0800 -- .../spark/ml/classification/OneVsRest.scala | 39 ++-- .../ml/classification/OneVsRestSuite.scala | 10 + python/pyspark/ml/classification.py | 27 +++--- python/pyspark/ml/tests.py | 14 +++ 4 files changed, 81 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b749b6c/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index e58b30d..c4a8f1f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -34,6 +34,7 @@ import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} +import org.apache.spark.ml.param.shared.HasWeightCol import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ @@ -53,7 +54,8 @@ private[ml] trait ClassifierTypeTrait { /** * Params for [[OneVsRest]]. */ -private[ml] trait OneVsRestParams extends PredictorParams with ClassifierTypeTrait { +private[ml] trait OneVsRestParams extends PredictorParams + with ClassifierTypeTrait with HasWeightCol { /** * param for the base binary classifier that we reduce multiclass classification into. @@ -299,6 +301,18 @@ final class OneVsRest @Since("1.4.0") ( @Since("1.5.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) + /** + * Sets the value of param [[weightCol]]. + * + * This is ignored if weight is not supported by [[classifier]]. + * If this is not set or empty, we treat all instance weights as 1.0. + * Default is not set, so all instances have weight one. + * + * @group setParam + */ + @Since("2.3.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + @Since("1.4.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema, fitting = true, getClassifier.featuresDataType) @@ -317,7 +331,20 @@ final class OneVsRest @Since("1.4.0") ( } val numClasses = MetadataUtils.getNumClasses(labelSchema).fold(computeNumClasses())(identity) -val multiclassLabeled = dataset.select($(labelCol), $(featuresCol)) +val weightColIsUsed = isDefined(weightCol) && $(weightCol).nonEmpty && { + getClassifier match { +case _: HasWeightCol => true +case c => + logWarning(s"weightCol is ignored, as it is not supported by $c now.") + false + } +} + +val multiclassLabeled = if (weightColIsUsed) { + dataset.select($(labelCol), $(featuresCol), $(weightCol)) +} else { + dataset.select($(labelCol), $(featuresCol)) +} // persist if underlying dataset is not persistent. val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE @@ -337,7 +364,13 @@ final class OneVsRest @Since("1.4.0") ( paramMap.put(classifier.labelCol -> labelColName) paramMap.put(classifier.featuresCol -> getFeaturesCol) paramMap.put(classifier.predictionCol -> getPredictionCol) - classifier.fit(trainingDataset, paramMap) + if (weightColIsUsed) { +val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol] +paramMap.put(classifier_.weightCol -> getWeightCol) +classifier_.fit(trainingDataset, paramMap) + }
spark git commit: [SPARK-19270][FOLLOW-UP][ML] PySpark GLR model.summary should return a printable representation.
Repository: spark Updated Branches: refs/heads/master fdcee028a -> f763d8464 [SPARK-19270][FOLLOW-UP][ML] PySpark GLR model.summary should return a printable representation. ## What changes were proposed in this pull request? PySpark GLR ```model.summary``` should return a printable representation by calling Scala ```toString```. ## How was this patch tested? ``` from pyspark.ml.regression import GeneralizedLinearRegression dataset = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt") glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3) model = glr.fit(dataset) model.summary ``` Before this PR: ![image](https://user-images.githubusercontent.com/1962026/29021059-e221633e-7b96-11e7-8d77-5d53f89c81a9.png) After this PR: ![image](https://user-images.githubusercontent.com/1962026/29021097-fce80fa6-7b96-11e7-8ab4-7e113d447d5d.png) Author: Yanbo LiangCloses #18870 from yanboliang/spark-19270. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f763d846 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f763d846 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f763d846 Branch: refs/heads/master Commit: f763d8464b32852d7fd33e962e5476a7f03bc6c6 Parents: fdcee02 Author: Yanbo Liang Authored: Tue Aug 8 08:43:58 2017 +0800 Committer: Yanbo Liang Committed: Tue Aug 8 08:43:58 2017 +0800 -- python/pyspark/ml/regression.py | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f763d846/python/pyspark/ml/regression.py -- diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index 2cc6234..72374ac 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -1745,6 +1745,9 @@ class GeneralizedLinearRegressionTrainingSummary(GeneralizedLinearRegressionSumm """ return self._call_java("pValues") +def __repr__(self): +return self._call_java("toString") + if __name__ == "__main__": import doctest - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21542][ML][PYTHON] Python persistence helper functions
Repository: spark Updated Branches: refs/heads/master baf5cac0f -> fdcee028a [SPARK-21542][ML][PYTHON] Python persistence helper functions ## What changes were proposed in this pull request? Added DefaultParamsWriteable, DefaultParamsReadable, DefaultParamsWriter, and DefaultParamsReader to Python to support Python-only persistence of Json-serializable parameters. ## How was this patch tested? Instantiated an estimator with Json-serializable parameters (ex. LogisticRegression), saved it using the added helper functions, and loaded it back, and compared it to the original instance to make sure it is the same. This test was both done in the Python REPL and implemented in the unit tests. Note to reviewers: there are a few excess comments that I left in the code for clarity but will remove before the code is merged to master. Author: Ajay SainiCloses #18742 from ajaysaini725/PythonPersistenceHelperFunctions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdcee028 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdcee028 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdcee028 Branch: refs/heads/master Commit: fdcee028afa7a7ac0f8bd8f59ee4933d7caea064 Parents: baf5cac Author: Ajay Saini Authored: Mon Aug 7 17:03:20 2017 -0700 Committer: Joseph K. Bradley Committed: Mon Aug 7 17:03:20 2017 -0700 -- .../org/apache/spark/ml/util/ReadWrite.scala| 37 ++- python/pyspark/ml/param/__init__.py | 11 + python/pyspark/ml/pipeline.py | 10 - python/pyspark/ml/tests.py | 34 +++ python/pyspark/ml/util.py | 302 +-- 5 files changed, 342 insertions(+), 52 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fdcee028/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala index b54e258..65f142c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala @@ -96,21 +96,7 @@ abstract class MLWriter extends BaseReadWrite with Logging { @Since("1.6.0") @throws[IOException]("If the input path already exists but overwrite is not enabled.") def save(path: String): Unit = { -val hadoopConf = sc.hadoopConfiguration -val outputPath = new Path(path) -val fs = outputPath.getFileSystem(hadoopConf) -val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) -if (fs.exists(qualifiedOutputPath)) { - if (shouldOverwrite) { -logInfo(s"Path $path already exists. It will be overwritten.") -// TODO: Revert back to the original content if save is not successful. -fs.delete(qualifiedOutputPath, true) - } else { -throw new IOException(s"Path $path already exists. To overwrite it, " + - s"please use write.overwrite().save(path) for Scala and use " + - s"write().overwrite().save(path) for Java and Python.") - } -} +new FileSystemOverwrite().handleOverwrite(path, shouldOverwrite, sc) saveImpl(path) } @@ -471,3 +457,24 @@ private[ml] object MetaAlgorithmReadWrite { List((instance.uid, instance)) ++ subStageMaps } } + +private[ml] class FileSystemOverwrite extends Logging { + + def handleOverwrite(path: String, shouldOverwrite: Boolean, sc: SparkContext): Unit = { +val hadoopConf = sc.hadoopConfiguration +val outputPath = new Path(path) +val fs = outputPath.getFileSystem(hadoopConf) +val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) +if (fs.exists(qualifiedOutputPath)) { + if (shouldOverwrite) { +logInfo(s"Path $path already exists. It will be overwritten.") +// TODO: Revert back to the original content if save is not successful. +fs.delete(qualifiedOutputPath, true) + } else { +throw new IOException(s"Path $path already exists. To overwrite it, " + + s"please use write.overwrite().save(path) for Scala and use " + + s"write().overwrite().save(path) for Java and Python.") + } +} + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/fdcee028/python/pyspark/ml/param/__init__.py -- diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py index 4583ae8..1334207 100644 --- a/python/pyspark/ml/param/__init__.py +++ b/python/pyspark/ml/param/__init__.py @@ -384,6 +384,17 @@
spark git commit: [SPARK-18535][SPARK-19720][CORE][BACKPORT-2.1] Redact sensitive information
Repository: spark Updated Branches: refs/heads/branch-2.1 5634fadb0 -> 444cca14d [SPARK-18535][SPARK-19720][CORE][BACKPORT-2.1] Redact sensitive information ## What changes were proposed in this pull request? Backporting SPARK-18535 and SPARK-19720 to spark 2.1 It's a backport PR that redacts senstive information by configuration to Spark UI and Spark Submit console logs. Using reference from Mark Grover markapache.org PRs ## How was this patch tested? Same tests from PR applied Author: Mark GroverCloses #18802 from dmvieira/feature-redact. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/444cca14 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/444cca14 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/444cca14 Branch: refs/heads/branch-2.1 Commit: 444cca14d7ac8c5ab5d7e9d080b11f4d6babe3bf Parents: 5634fad Author: Mark Grover Authored: Mon Aug 7 14:23:05 2017 -0700 Committer: Marcelo Vanzin Committed: Mon Aug 7 14:23:05 2017 -0700 -- .../org/apache/spark/deploy/SparkSubmit.scala | 3 +- .../spark/deploy/SparkSubmitArguments.scala | 12 +-- .../apache/spark/internal/config/package.scala | 9 ++ .../spark/scheduler/EventLoggingListener.scala | 13 +++- .../apache/spark/ui/env/EnvironmentPage.scala | 12 +++ .../apache/spark/ui/env/EnvironmentTab.scala| 1 + .../scala/org/apache/spark/util/Utils.scala | 33 +++- .../scheduler/EventLoggingListenerSuite.scala | 12 +++ .../org/apache/spark/util/UtilsSuite.scala | 20 docs/configuration.md | 9 ++ .../spark/deploy/yarn/ExecutorRunnable.scala| 3 +- 11 files changed, 111 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/444cca14/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 443f1f5..653830e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -670,7 +670,8 @@ object SparkSubmit { if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") - printStream.println(s"System properties:\n${sysProps.mkString("\n")}") + // sysProps may contain sensitive information, so redact before printing + printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}") printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}") printStream.println("\n") } http://git-wip-us.apache.org/repos/asf/spark/blob/444cca14/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f1761e7..883842c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -84,9 +84,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S // scalastyle:off println if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile") Option(propertiesFile).foreach { filename => - Utils.getPropertiesFromFile(filename).foreach { case (k, v) => + val properties = Utils.getPropertiesFromFile(filename) + properties.foreach { case (k, v) => defaultProperties(k) = v -if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v") + } + // Property files may contain sensitive information, so redact before printing + if (verbose) { +Utils.redact(properties).foreach { case (k, v) => + SparkSubmit.printStream.println(s"Adding default property: $k=$v") +} } } // scalastyle:on println @@ -318,7 +324,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |Spark properties used, including those specified through | --conf and those from the properties file $propertiesFile: -|${sparkProperties.mkString(" ", "\n ", "\n")} +|${Utils.redact(sparkProperties).mkString(" ", "\n ", "\n")} """.stripMargin } http://git-wip-us.apache.org/repos/asf/spark/blob/444cca14/core/src/main/scala/org/apache/spark/internal/config/package.scala
spark git commit: [SPARK-21648][SQL] Fix confusing assert failure in JDBC source when parallel fetching parameters are not properly provided.
Repository: spark Updated Branches: refs/heads/branch-2.2 fa92a7be7 -> a1c1199e1 [SPARK-21648][SQL] Fix confusing assert failure in JDBC source when parallel fetching parameters are not properly provided. ### What changes were proposed in this pull request? ```SQL CREATE TABLE mytesttable1 USING org.apache.spark.sql.jdbc OPTIONS ( url 'jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}=${jdbcPassword}', dbtable 'mytesttable1', paritionColumn 'state_id', lowerBound '0', upperBound '52', numPartitions '53', fetchSize '1' ) ``` The above option name `paritionColumn` is wrong. That mean, users did not provide the value for `partitionColumn`. In such case, users hit a confusing error. ``` AssertionError: assertion failed java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:39) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:312) ``` ### How was this patch tested? Added a test case Author: gatorsmileCloses #18864 from gatorsmile/jdbcPartCol. (cherry picked from commit baf5cac0f8c35925c366464d7e0eb5f6023fce57) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1c1199e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1c1199e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1c1199e Branch: refs/heads/branch-2.2 Commit: a1c1199e122889ed34415be5e4da67168107a595 Parents: fa92a7b Author: gatorsmile Authored: Mon Aug 7 13:04:04 2017 -0700 Committer: gatorsmile Committed: Mon Aug 7 13:04:22 2017 -0700 -- .../datasources/jdbc/JDBCOptions.scala | 11 ++ .../datasources/jdbc/JdbcRelationProvider.scala | 9 ++-- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 22 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 5 +++-- 4 files changed, 39 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a1c1199e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 591096d..96a8a51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -97,10 +97,13 @@ class JDBCOptions( val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong) // the upper bound of the partition column val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong) - require(partitionColumn.isEmpty || -(lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined), -s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," + - s" and '$JDBC_NUM_PARTITIONS' are required.") + // numPartitions is also used for data source writing + require((partitionColumn.isEmpty && lowerBound.isEmpty && upperBound.isEmpty) || +(partitionColumn.isDefined && lowerBound.isDefined && upperBound.isDefined && + numPartitions.isDefined), +s"When reading JDBC data sources, users need to specify all or none for the following " + + s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " + + s"and '$JDBC_NUM_PARTITIONS'") val fetchSize = { val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt require(size >= 0, http://git-wip-us.apache.org/repos/asf/spark/blob/a1c1199e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 74dcfb0..37e7bb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -29,6 +29,8 @@ class JdbcRelationProvider extends CreatableRelationProvider override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +import JDBCOptions._ +
spark git commit: [SPARK-21648][SQL] Fix confusing assert failure in JDBC source when parallel fetching parameters are not properly provided.
Repository: spark Updated Branches: refs/heads/master cce25b360 -> baf5cac0f [SPARK-21648][SQL] Fix confusing assert failure in JDBC source when parallel fetching parameters are not properly provided. ### What changes were proposed in this pull request? ```SQL CREATE TABLE mytesttable1 USING org.apache.spark.sql.jdbc OPTIONS ( url 'jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}=${jdbcPassword}', dbtable 'mytesttable1', paritionColumn 'state_id', lowerBound '0', upperBound '52', numPartitions '53', fetchSize '1' ) ``` The above option name `paritionColumn` is wrong. That mean, users did not provide the value for `partitionColumn`. In such case, users hit a confusing error. ``` AssertionError: assertion failed java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:39) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:312) ``` ### How was this patch tested? Added a test case Author: gatorsmileCloses #18864 from gatorsmile/jdbcPartCol. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/baf5cac0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/baf5cac0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/baf5cac0 Branch: refs/heads/master Commit: baf5cac0f8c35925c366464d7e0eb5f6023fce57 Parents: cce25b3 Author: gatorsmile Authored: Mon Aug 7 13:04:04 2017 -0700 Committer: gatorsmile Committed: Mon Aug 7 13:04:04 2017 -0700 -- .../datasources/jdbc/JDBCOptions.scala | 11 ++ .../datasources/jdbc/JdbcRelationProvider.scala | 9 ++-- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 22 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 5 +++-- 4 files changed, 39 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/baf5cac0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 591096d..96a8a51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -97,10 +97,13 @@ class JDBCOptions( val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong) // the upper bound of the partition column val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong) - require(partitionColumn.isEmpty || -(lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined), -s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," + - s" and '$JDBC_NUM_PARTITIONS' are required.") + // numPartitions is also used for data source writing + require((partitionColumn.isEmpty && lowerBound.isEmpty && upperBound.isEmpty) || +(partitionColumn.isDefined && lowerBound.isDefined && upperBound.isDefined && + numPartitions.isDefined), +s"When reading JDBC data sources, users need to specify all or none for the following " + + s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " + + s"and '$JDBC_NUM_PARTITIONS'") val fetchSize = { val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt require(size >= 0, http://git-wip-us.apache.org/repos/asf/spark/blob/baf5cac0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 74dcfb0..37e7bb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -29,6 +29,8 @@ class JdbcRelationProvider extends CreatableRelationProvider override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +import JDBCOptions._ + val jdbcOptions = new JDBCOptions(parameters) val partitionColumn = jdbcOptions.partitionColumn val lowerBound
spark git commit: [SPARK-21565][SS] Propagate metadata in attribute replacement.
Repository: spark Updated Branches: refs/heads/branch-2.2 43f9c84b6 -> fa92a7be7 [SPARK-21565][SS] Propagate metadata in attribute replacement. ## What changes were proposed in this pull request? Propagate metadata in attribute replacement during streaming execution. This is necessary for EventTimeWatermarks consuming replaced attributes. ## How was this patch tested? new unit test, which was verified to fail before the fix Author: Jose TorresCloses #18840 from joseph-torres/SPARK-21565. (cherry picked from commit cce25b360ee9e39d9510134c73a1761475eaf4ac) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa92a7be Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa92a7be Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa92a7be Branch: refs/heads/branch-2.2 Commit: fa92a7be709e78db8e8f50dca8e13855c1034fde Parents: 43f9c84 Author: Jose Torres Authored: Mon Aug 7 12:27:16 2017 -0700 Committer: Shixiong Zhu Committed: Mon Aug 7 12:27:30 2017 -0700 -- .../execution/streaming/StreamExecution.scala | 3 ++- .../sql/streaming/EventTimeWatermarkSuite.scala | 28 2 files changed, 30 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fa92a7be/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index b6ddf74..63c4dc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -631,7 +631,8 @@ class StreamExecution( // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { - case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case a: Attribute if replacementMap.contains(a) => +replacementMap(a).withMetadata(a.metadata) case ct: CurrentTimestamp => CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, ct.dataType) http://git-wip-us.apache.org/repos/asf/spark/blob/fa92a7be/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 552911f..4f19fa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -391,6 +391,34 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche checkDataset[Long](df, 1L to 100L: _*) } + test("SPARK-21565: watermark operator accepts attributes from replacement") { +withTempDir { dir => + dir.delete() + + val df = Seq(("a", 100.0, new java.sql.Timestamp(100L))) +.toDF("symbol", "price", "eventTime") + df.write.json(dir.getCanonicalPath) + + val input = spark.readStream.schema(df.schema) +.json(dir.getCanonicalPath) + + val groupEvents = input +.withWatermark("eventTime", "2 seconds") +.groupBy("symbol", "eventTime") +.agg(count("price") as 'count) +.select("symbol", "eventTime", "count") + val q = groupEvents.writeStream +.outputMode("append") +.format("console") +.start() + try { +q.processAllAvailable() + } finally { +q.stop() + } +} + } + private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q => val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21565][SS] Propagate metadata in attribute replacement.
Repository: spark Updated Branches: refs/heads/master 4f7ec3a31 -> cce25b360 [SPARK-21565][SS] Propagate metadata in attribute replacement. ## What changes were proposed in this pull request? Propagate metadata in attribute replacement during streaming execution. This is necessary for EventTimeWatermarks consuming replaced attributes. ## How was this patch tested? new unit test, which was verified to fail before the fix Author: Jose TorresCloses #18840 from joseph-torres/SPARK-21565. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cce25b36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cce25b36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cce25b36 Branch: refs/heads/master Commit: cce25b360ee9e39d9510134c73a1761475eaf4ac Parents: 4f7ec3a Author: Jose Torres Authored: Mon Aug 7 12:27:16 2017 -0700 Committer: Shixiong Zhu Committed: Mon Aug 7 12:27:16 2017 -0700 -- .../execution/streaming/StreamExecution.scala | 3 ++- .../sql/streaming/EventTimeWatermarkSuite.scala | 28 2 files changed, 30 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cce25b36/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5711262..1528e7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -628,7 +628,8 @@ class StreamExecution( // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { - case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case a: Attribute if replacementMap.contains(a) => +replacementMap(a).withMetadata(a.metadata) case ct: CurrentTimestamp => CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, ct.dataType) http://git-wip-us.apache.org/repos/asf/spark/blob/cce25b36/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 552911f..4f19fa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -391,6 +391,34 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche checkDataset[Long](df, 1L to 100L: _*) } + test("SPARK-21565: watermark operator accepts attributes from replacement") { +withTempDir { dir => + dir.delete() + + val df = Seq(("a", 100.0, new java.sql.Timestamp(100L))) +.toDF("symbol", "price", "eventTime") + df.write.json(dir.getCanonicalPath) + + val input = spark.readStream.schema(df.schema) +.json(dir.getCanonicalPath) + + val groupEvents = input +.withWatermark("eventTime", "2 seconds") +.groupBy("symbol", "eventTime") +.agg(count("price") as 'count) +.select("symbol", "eventTime", "count") + val q = groupEvents.writeStream +.outputMode("append") +.format("console") +.start() + try { +q.processAllAvailable() + } finally { +q.stop() + } +} + } + private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q => val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21374][CORE] Fix reading globbed paths from S3 into DF with disabled FS cache
Repository: spark Updated Branches: refs/heads/branch-2.2 4f0eb0c86 -> 43f9c84b6 [SPARK-21374][CORE] Fix reading globbed paths from S3 into DF with disabled FS cache This PR replaces #18623 to do some clean up. Closes #18623 Jenkins Author: Shixiong ZhuAuthor: Andrey Taptunov Closes #18848 from zsxwing/review-pr18623. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43f9c84b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43f9c84b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43f9c84b Branch: refs/heads/branch-2.2 Commit: 43f9c84b6749b2ebf802e1f062238167b2b1f3bb Parents: 4f0eb0c Author: Andrey Taptunov Authored: Fri Aug 4 22:40:04 2017 -0700 Committer: Shixiong Zhu Committed: Mon Aug 7 11:04:32 2017 -0700 -- .../apache/spark/deploy/SparkHadoopUtil.scala | 8 .../sql/execution/datasources/DataSource.scala | 45 2 files changed, 36 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/43f9c84b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6afe58b..550bd68 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -227,6 +227,10 @@ class SparkHadoopUtil extends Logging { def globPath(pattern: Path): Seq[Path] = { val fs = pattern.getFileSystem(conf) +globPath(fs, pattern) + } + + def globPath(fs: FileSystem, pattern: Path): Seq[Path] = { Option(fs.globStatus(pattern)).map { statuses => statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq }.getOrElse(Seq.empty[Path]) @@ -236,6 +240,10 @@ class SparkHadoopUtil extends Logging { if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern) } + def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = { +if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern) + } + /** * Lists all the files in a directory with the specified prefix, and does not end with the * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of http://git-wip-us.apache.org/repos/asf/spark/blob/43f9c84b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 0915bd3..a13bb24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -123,7 +124,7 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) -SparkHadoopUtil.get.globPathIfNecessary(qualified) +SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) }.toArray new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) } @@ -345,22 +346,8 @@ case class DataSource( case (format: FileFormat, _) => val allPaths = caseInsensitiveOptions.get("path") ++ paths val hadoopConf = sparkSession.sessionState.newHadoopConf() -val globbedPaths = allPaths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified) - - if (globPath.isEmpty) { -throw new AnalysisException(s"Path does not exist: $qualified") - } - // Sufficient to check head of the globPath seq for non-glob scenario - // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && !fs.exists(globPath.head)) { -throw new AnalysisException(s"Path does not exist: ${globPath.head}") -
spark git commit: [SPARK][DOCS] Added note on meaning of position to substring function
Repository: spark Updated Branches: refs/heads/master bbfd6b5d2 -> 4f7ec3a31 [SPARK][DOCS] Added note on meaning of position to substring function ## What changes were proposed in this pull request? Enhanced some existing documentation Please review http://spark.apache.org/contributing.html before opening a pull request. Author: MacCloses #18710 from maclockard/maclockard-patch-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7ec3a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7ec3a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7ec3a3 Branch: refs/heads/master Commit: 4f7ec3a316fd2c6e0828d8777d29abb472fd7a14 Parents: bbfd6b5 Author: Mac Authored: Mon Aug 7 17:16:03 2017 +0100 Committer: Sean Owen Committed: Mon Aug 7 17:16:03 2017 +0100 -- python/pyspark/sql/functions.py | 4 +++- sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f7ec3a3/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 2c8c8e2..0e76182 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1424,7 +1424,9 @@ def substring(str, pos, len): """ Substring starts at `pos` and is of length `len` when str is String type or returns the slice of byte array that starts at `pos` in byte and is of length `len` -when str is Binary type +when str is Binary type. + +.. note:: The position is not zero based, but 1 based index. >>> df = spark.createDataFrame([('abcd',)], ['s',]) >>> df.select(substring(df.s, 1, 2).alias('s')).collect() http://git-wip-us.apache.org/repos/asf/spark/blob/4f7ec3a3/sql/core/src/main/scala/org/apache/spark/sql/functions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index ccff00e..496619a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2408,6 +2408,8 @@ object functions { * returns the slice of byte array that starts at `pos` in byte and is of length `len` * when str is Binary type * + * @note The position is not zero based, but 1 based index. + * * @group string_funcs * @since 1.5.0 */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21647][SQL] Fix SortMergeJoin when using CROSS
Repository: spark Updated Branches: refs/heads/branch-2.2 7a04def92 -> 4f0eb0c86 [SPARK-21647][SQL] Fix SortMergeJoin when using CROSS ### What changes were proposed in this pull request? author: BoleynSu closes https://github.com/apache/spark/pull/18836 ```Scala val df = Seq((1, 1)).toDF("i", "j") df.createOrReplaceTempView("T") withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { sql("select * from (select a.i from T a cross join T t where t.i = a.i) as t1 " + "cross join T t2 where t2.i = t1.i").explain(true) } ``` The above code could cause the following exception: ``` SortMergeJoinExec should not take Cross as the JoinType java.lang.IllegalArgumentException: SortMergeJoinExec should not take Cross as the JoinType at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputOrdering(SortMergeJoinExec.scala:100) ``` Our SortMergeJoinExec supports CROSS. We should not hit such an exception. This PR is to fix the issue. ### How was this patch tested? Modified the two existing test cases. Author: Xiao LiAuthor: Boleyn Su Closes #18863 from gatorsmile/pr-18836. (cherry picked from commit bbfd6b5d24be5919a3ab1ac3eaec46e33201df39) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f0eb0c8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f0eb0c8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f0eb0c8 Branch: refs/heads/branch-2.2 Commit: 4f0eb0c862c0362b14fc5db468f4fc08fb8a08c6 Parents: 7a04def Author: Xiao Li Authored: Tue Aug 8 00:00:01 2017 +0800 Committer: Wenchen Fan Committed: Tue Aug 8 00:00:16 2017 +0800 -- .../sql/execution/joins/SortMergeJoinExec.scala | 2 +- .../spark/sql/execution/PlannerSuite.scala | 36 +++- 2 files changed, 21 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f0eb0c8/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index a772015..bd5b633 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -82,7 +82,7 @@ case class SortMergeJoinExec( override def outputOrdering: Seq[SortOrder] = joinType match { // For inner join, orders of both sides keys should be kept. -case Inner => +case _: InnerLike => val leftKeyOrdering = getKeyOrdering(leftKeys, left.outputOrdering) val rightKeyOrdering = getKeyOrdering(rightKeys, right.outputOrdering) leftKeyOrdering.zip(rightKeyOrdering).map { case (lKey, rKey) => http://git-wip-us.apache.org/repos/asf/spark/blob/4f0eb0c8/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 4d155d5..63e17c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{execution, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.InMemoryRelation @@ -513,26 +513,30 @@ class PlannerSuite extends SharedSQLContext { } test("EnsureRequirements skips sort when either side of join keys is required after inner SMJ") { -val innerSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, Inner, None, planA, planB) -// Both left and right keys should be sorted after the SMJ. -Seq(orderingA, orderingB).foreach { ordering => - assertSortRequirementsAreSatisfied( -childPlan = innerSmj, -requiredOrdering = Seq(ordering), -shouldHaveSort = false) +Seq(Inner, Cross).foreach { joinType => + val innerSmj = SortMergeJoinExec(exprA :: Nil, exprB ::
spark git commit: [SPARK-21647][SQL] Fix SortMergeJoin when using CROSS
Repository: spark Updated Branches: refs/heads/master 8b69b17f3 -> bbfd6b5d2 [SPARK-21647][SQL] Fix SortMergeJoin when using CROSS ### What changes were proposed in this pull request? author: BoleynSu closes https://github.com/apache/spark/pull/18836 ```Scala val df = Seq((1, 1)).toDF("i", "j") df.createOrReplaceTempView("T") withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { sql("select * from (select a.i from T a cross join T t where t.i = a.i) as t1 " + "cross join T t2 where t2.i = t1.i").explain(true) } ``` The above code could cause the following exception: ``` SortMergeJoinExec should not take Cross as the JoinType java.lang.IllegalArgumentException: SortMergeJoinExec should not take Cross as the JoinType at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputOrdering(SortMergeJoinExec.scala:100) ``` Our SortMergeJoinExec supports CROSS. We should not hit such an exception. This PR is to fix the issue. ### How was this patch tested? Modified the two existing test cases. Author: Xiao LiAuthor: Boleyn Su Closes #18863 from gatorsmile/pr-18836. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbfd6b5d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbfd6b5d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbfd6b5d Branch: refs/heads/master Commit: bbfd6b5d24be5919a3ab1ac3eaec46e33201df39 Parents: 8b69b17 Author: Xiao Li Authored: Tue Aug 8 00:00:01 2017 +0800 Committer: Wenchen Fan Committed: Tue Aug 8 00:00:01 2017 +0800 -- .../sql/execution/joins/SortMergeJoinExec.scala | 2 +- .../spark/sql/execution/PlannerSuite.scala | 36 +++- 2 files changed, 21 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bbfd6b5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 639b8e0..f41fa14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -82,7 +82,7 @@ case class SortMergeJoinExec( override def outputOrdering: Seq[SortOrder] = joinType match { // For inner join, orders of both sides keys should be kept. -case Inner => +case _: InnerLike => val leftKeyOrdering = getKeyOrdering(leftKeys, left.outputOrdering) val rightKeyOrdering = getKeyOrdering(rightKeys, right.outputOrdering) leftKeyOrdering.zip(rightKeyOrdering).map { case (lKey, rKey) => http://git-wip-us.apache.org/repos/asf/spark/blob/bbfd6b5d/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 4d155d5..63e17c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{execution, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.InMemoryRelation @@ -513,26 +513,30 @@ class PlannerSuite extends SharedSQLContext { } test("EnsureRequirements skips sort when either side of join keys is required after inner SMJ") { -val innerSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, Inner, None, planA, planB) -// Both left and right keys should be sorted after the SMJ. -Seq(orderingA, orderingB).foreach { ordering => - assertSortRequirementsAreSatisfied( -childPlan = innerSmj, -requiredOrdering = Seq(ordering), -shouldHaveSort = false) +Seq(Inner, Cross).foreach { joinType => + val innerSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, joinType, None, planA, planB) + // Both left and right keys should be sorted after the SMJ. + Seq(orderingA,
spark git commit: [SPARK-21544][DEPLOY][TEST-MAVEN] Tests jar of some module should not upload twice
Repository: spark Updated Branches: refs/heads/master 1426eea84 -> 8b69b17f3 [SPARK-21544][DEPLOY][TEST-MAVEN] Tests jar of some module should not upload twice ## What changes were proposed in this pull request? **For moudle below:** common/network-common streaming sql/core sql/catalyst **tests.jar will install or deploy twice.Like:** `[DEBUG] Installing org.apache.spark:spark-streaming_2.11/maven-metadata.xml to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/maven-metadata-local.xml [INFO] Installing /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar [DEBUG] Skipped re-installing /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar, seems unchanged` **The reason is below:** `[DEBUG] (f) artifact = org.apache.spark:spark-streaming_2.11:jar:2.1.0-mdh2.1.0.1-SNAPSHOT [DEBUG] (f) attachedArtifacts = [org.apache.spark:spark-streaming_2.11:test-jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11:jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark -streaming_2.11:java-source:sources:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11:java-source:test-sources:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11:javadoc:javadoc:2.1.0 -mdh2.1.0.1-SNAPSHOT]` when executing 'mvn deploy' to nexus during release.I will fail since release nexus can not be overrided. ## How was this patch tested? Execute 'mvn clean install -Pyarn -Phadoop-2.6 -Phadoop-provided -DskipTests' Author: zhoukangCloses #18745 from caneGuy/zhoukang/fix-installtwice. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b69b17f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b69b17f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b69b17f Branch: refs/heads/master Commit: 8b69b17f3fde2c890068a5a9ef490fe675cc80c1 Parents: 1426eea Author: zhoukang Authored: Mon Aug 7 12:51:39 2017 +0100 Committer: Sean Owen Committed: Mon Aug 7 12:51:39 2017 +0100 -- common/network-common/pom.xml | 2 +- sql/catalyst/pom.xml | 7 +-- sql/core/pom.xml | 7 +-- 3 files changed, 3 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8b69b17f/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 0254d0c..ccd8504 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -126,7 +126,7 @@ maven-jar-plugin -test-jar-on-test-compile +prepare-test-jar test-compile test-jar http://git-wip-us.apache.org/repos/asf/spark/blob/8b69b17f/sql/catalyst/pom.xml -- diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 0bbf7a9..fce8149 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -118,12 +118,7 @@ maven-jar-plugin - - test-jar - - - -test-jar-on-test-compile +prepare-test-jar test-compile test-jar http://git-wip-us.apache.org/repos/asf/spark/blob/8b69b17f/sql/core/pom.xml -- diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 661c31d..25004e5 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -174,12 +174,7 @@ maven-jar-plugin - - test-jar - - - -test-jar-on-test-compile +prepare-test-jar test-compile test-jar - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21623][ML] fix RF doc
Repository: spark Updated Branches: refs/heads/master 663f30d14 -> 1426eea84 [SPARK-21623][ML] fix RF doc ## What changes were proposed in this pull request? comments of parentStats in RF are wrong. parentStats is not only used for the first iteration, it is used with all the iteration for unordered features. ## How was this patch tested? Author: Peng MengCloses #18832 from mpjlu/fixRFDoc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1426eea8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1426eea8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1426eea8 Branch: refs/heads/master Commit: 1426eea84c544000273d176514532cb7f7015cea Parents: 663f30d Author: Peng Meng Authored: Mon Aug 7 11:03:07 2017 +0100 Committer: Sean Owen Committed: Mon Aug 7 11:03:07 2017 +0100 -- .../org/apache/spark/ml/tree/impl/DTStatsAggregator.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1426eea8/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DTStatsAggregator.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DTStatsAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DTStatsAggregator.scala index 61091bb..5aeea14 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DTStatsAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DTStatsAggregator.scala @@ -78,9 +78,9 @@ private[spark] class DTStatsAggregator( /** * Array of parent node sufficient stats. - * - * Note: this is necessary because stats for the parent node are not available - * on the first iteration of tree learning. + * Note: parent stats need to be explicitly tracked in the [[DTStatsAggregator]] for unordered + * categorical features, because the parent [[Node]] object does not have [[ImpurityStats]] + * on the first iteration. */ private val parentStats: Array[Double] = new Array[Double](statsSize) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark-website git commit: Update Hive compatibility wording
Repository: spark-website Updated Branches: refs/heads/asf-site 889909689 -> eb51b33f0 Update Hive compatibility wording Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/eb51b33f Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/eb51b33f Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/eb51b33f Branch: refs/heads/asf-site Commit: eb51b33f07b633acc70534ca022f0952d5b00d66 Parents: 8899096 Author: Matei ZahariaAuthored: Sun Aug 6 12:46:03 2017 +0200 Committer: Matei Zaharia Committed: Sun Aug 6 12:46:03 2017 +0200 -- site/sql/index.html | 8 sql/index.md| 8 2 files changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/eb51b33f/site/sql/index.html -- diff --git a/site/sql/index.html b/site/sql/index.html index 49e8a55..9b84cda 100644 --- a/site/sql/index.html +++ b/site/sql/index.html @@ -250,13 +250,13 @@ -Hive Compatibility +Hive Integration - Run unmodified Hive queries on existing data. + Run SQL or HiveQL queries on existing warehouses. - Spark SQL reuses the Hive frontend and metastore, giving you full compatibility with - existing Hive data, queries, and UDFs. Simply install it alongside Hive. + Spark SQL supports the HiveQL syntax as well as Hive SerDes and UDFs, allowing + you to access existing Hive warehouses. http://git-wip-us.apache.org/repos/asf/spark-website/blob/eb51b33f/sql/index.md -- diff --git a/sql/index.md b/sql/index.md index ac308e1..83a0092 100644 --- a/sql/index.md +++ b/sql/index.md @@ -61,13 +61,13 @@ subproject: SQL -Hive Compatibility +Hive Integration - Run unmodified Hive queries on existing data. + Run SQL or HiveQL queries on existing warehouses. - Spark SQL reuses the Hive frontend and metastore, giving you full compatibility with - existing Hive data, queries, and UDFs. Simply install it alongside Hive. + Spark SQL supports the HiveQL syntax as well as Hive SerDes and UDFs, allowing + you to access existing Hive warehouses. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13041][MESOS] Adds sandbox uri to spark dispatcher ui
Repository: spark Updated Branches: refs/heads/master 534a063f7 -> 663f30d14 [SPARK-13041][MESOS] Adds sandbox uri to spark dispatcher ui ## What changes were proposed in this pull request? Adds a sandbox link per driver in the dispatcher ui with minimal changes after a bug was fixed here: https://issues.apache.org/jira/browse/MESOS-4992 The sandbox uri has the following format: http:///#/slaves/\/ frameworks/ \ /executors/\ /browse For dc/os the proxy uri is /mesos. For the dc/os deployment scenario and to make things easier I introduced a new config property named `spark.mesos.proxy.baseURL` which should be passed to the dispatcher when launched using --conf. If no such configuration is detected then no sandbox uri is depicted, and there is an empty column with a header (this can be changed so nothing is shown). Within dc/os the base url must be a property for the dispatcher that we should add in the future here: https://github.com/mesosphere/universe/blob/9e7c909c3b8680eeb0494f2a58d5746e3bab18c1/repo/packages/S/spark/26/config.json It is not easy to detect in different environments what is that uri so user should pass it. ## How was this patch tested? Tested with the mesos test suite here: https://github.com/typesafehub/mesos-spark-integration-tests. Attached image shows the ui modification where the sandbox header is added. ![image](https://user-images.githubusercontent.com/7945591/27831630-2a3b447e-60d4-11e7-87bb-d057efd4efa7.png) Tested the uri redirection the way it was suggested here: https://issues.apache.org/jira/browse/MESOS-4992 Built mesos 1.4 from the master branch and started the mesos dispatcher with the command: `./sbin/start-mesos-dispatcher.sh --conf spark.mesos.proxy.baseURL=http://localhost:5050 -m mesos://127.0.0.1:5050` Run a spark example: `./bin/spark-submit --class org.apache.spark.examples.SparkPi --master mesos://10.10.1.79:7078 --deploy-mode cluster --executor-memory 2G --total-executor-cores 2 http:///spark-examples_2.11-2.1.1.jar 10` Sandbox uri is shown at the bottom of the page: ![image](https://user-images.githubusercontent.com/7945591/28599237-89d0a8c8-71b1-11e7-8f94-41ad117ceead.png) Redirection works as expected: ![image](https://user-images.githubusercontent.com/7945591/28599247-a5d65248-71b1-11e7-8b5e-a0ac2a79fa23.png) Author: Stavros Kontopoulos Closes #18528 from skonto/adds_the_sandbox_uri. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/663f30d1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/663f30d1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/663f30d1 Branch: refs/heads/master Commit: 663f30d14a0c9219e07697af1ab56e11a714d9a6 Parents: 534a063 Author: Stavros Kontopoulos Authored: Mon Aug 7 10:32:19 2017 +0100 Committer: Sean Owen Committed: Mon Aug 7 10:32:19 2017 +0100 -- docs/running-on-mesos.md | 2 ++ .../spark/deploy/mesos/ui/MesosClusterPage.scala | 14 +- 2 files changed, 15 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/663f30d1/docs/running-on-mesos.md -- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index cf257c0..ae38550 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -153,6 +153,8 @@ can find the results of the driver from the Mesos Web UI. To use cluster mode, you must start the `MesosClusterDispatcher` in your cluster via the `sbin/start-mesos-dispatcher.sh` script, passing in the Mesos master URL (e.g: mesos://host:5050). This starts the `MesosClusterDispatcher` as a daemon running on the host. +By setting the Mesos proxy config property (requires mesos version >= 1.4), `--conf spark.mesos.proxy.baseURL=http://localhost:5050` when launching the dispacther, the mesos sandbox URI for each driver is added to the mesos dispatcher UI. + If you like to run the `MesosClusterDispatcher` with Marathon, you need to run the `MesosClusterDispatcher` in the foreground (i.e: `bin/spark-class org.apache.spark.deploy.mesos.MesosClusterDispatcher`). Note that the `MesosClusterDispatcher` not yet supports multiple instances for HA. The `MesosClusterDispatcher` also supports writing recovery state into Zookeeper. This will allow the `MesosClusterDispatcher` to be able to recover all submitted and running containers on relaunch. In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spark-env by configuring `spark.deploy.recoveryMode` and related spark.deploy.zookeeper.* configurations.
spark git commit: [SPARK-21621][CORE] Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called
Repository: spark Updated Branches: refs/heads/master 39e044e3d -> 534a063f7 [SPARK-21621][CORE] Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called ## What changes were proposed in this pull request? We should reset numRecordsWritten to zero after DiskBlockObjectWriter.commitAndGet called. Because when `revertPartialWritesAndClose` be called, we decrease the written records in `ShuffleWriteMetrics` . However, we decreased the written records to zero, this should be wrong, we should only decreased the number reords after the last `commitAndGet` called. ## How was this patch tested? Modified existing test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Xianyang LiuCloses #18830 from ConeyLiu/DiskBlockObjectWriter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/534a063f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/534a063f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/534a063f Branch: refs/heads/master Commit: 534a063f7c693158437d13224f50d4ae789ff6fb Parents: 39e044e Author: Xianyang Liu Authored: Mon Aug 7 17:04:53 2017 +0800 Committer: Wenchen Fan Committed: Mon Aug 7 17:04:53 2017 +0800 -- .../scala/org/apache/spark/storage/DiskBlockObjectWriter.scala | 2 ++ .../org/apache/spark/storage/DiskBlockObjectWriterSuite.scala | 1 + 2 files changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/534a063f/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index eb3ff92..a024c83 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -95,6 +95,7 @@ private[spark] class DiskBlockObjectWriter( /** * Keep track of number of records written and also use this to periodically * output bytes written since the latter is expensive to do for each record. + * And we reset it after every commitAndGet called. */ private var numRecordsWritten = 0 @@ -185,6 +186,7 @@ private[spark] class DiskBlockObjectWriter( // In certain compression codecs, more bytes are written after streams are closed writeMetrics.incBytesWritten(committedPosition - reportedPosition) reportedPosition = committedPosition + numRecordsWritten = 0 fileSegment } else { new FileSegment(file, committedPosition, 0) http://git-wip-us.apache.org/repos/asf/spark/blob/534a063f/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index bfb3ac4..cea5501 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -116,6 +116,7 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { writer.revertPartialWritesAndClose() assert(firstSegment.length === file.length()) assert(writeMetrics.bytesWritten === file.length()) +assert(writeMetrics.recordsWritten == 1) } test("calling revertPartialWritesAndClose() after commit() should have no effect") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21621][CORE] Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called
Repository: spark Updated Branches: refs/heads/branch-2.2 098aaec30 -> 7a04def92 [SPARK-21621][CORE] Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called ## What changes were proposed in this pull request? We should reset numRecordsWritten to zero after DiskBlockObjectWriter.commitAndGet called. Because when `revertPartialWritesAndClose` be called, we decrease the written records in `ShuffleWriteMetrics` . However, we decreased the written records to zero, this should be wrong, we should only decreased the number reords after the last `commitAndGet` called. ## How was this patch tested? Modified existing test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Xianyang LiuCloses #18830 from ConeyLiu/DiskBlockObjectWriter. (cherry picked from commit 534a063f7c693158437d13224f50d4ae789ff6fb) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a04def9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a04def9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a04def9 Branch: refs/heads/branch-2.2 Commit: 7a04def920438ef0e08b66a95befeec981e5571e Parents: 098aaec Author: Xianyang Liu Authored: Mon Aug 7 17:04:53 2017 +0800 Committer: Wenchen Fan Committed: Mon Aug 7 17:05:02 2017 +0800 -- .../scala/org/apache/spark/storage/DiskBlockObjectWriter.scala | 2 ++ .../org/apache/spark/storage/DiskBlockObjectWriterSuite.scala | 1 + 2 files changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7a04def9/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index eb3ff92..a024c83 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -95,6 +95,7 @@ private[spark] class DiskBlockObjectWriter( /** * Keep track of number of records written and also use this to periodically * output bytes written since the latter is expensive to do for each record. + * And we reset it after every commitAndGet called. */ private var numRecordsWritten = 0 @@ -185,6 +186,7 @@ private[spark] class DiskBlockObjectWriter( // In certain compression codecs, more bytes are written after streams are closed writeMetrics.incBytesWritten(committedPosition - reportedPosition) reportedPosition = committedPosition + numRecordsWritten = 0 fileSegment } else { new FileSegment(file, committedPosition, 0) http://git-wip-us.apache.org/repos/asf/spark/blob/7a04def9/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index bfb3ac4..cea5501 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -116,6 +116,7 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { writer.revertPartialWritesAndClose() assert(firstSegment.length === file.length()) assert(writeMetrics.bytesWritten === file.length()) +assert(writeMetrics.recordsWritten == 1) } test("calling revertPartialWritesAndClose() after commit() should have no effect") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org