spark git commit: [SPARK-17065][SQL] Improve the error message when encountering an incompatible DataSourceRegister
Repository: spark Updated Branches: refs/heads/branch-2.0 8f4cacd3a -> 45036327f [SPARK-17065][SQL] Improve the error message when encountering an incompatible DataSourceRegister ## What changes were proposed in this pull request? Add an instruction to ask the user to remove or upgrade the incompatible DataSourceRegister in the error message. ## How was this patch tested? Test command: ``` build/sbt -Dscala-2.10 package SPARK_SCALA_VERSION=2.10 bin/spark-shell --packages ai.h2o:sparkling-water-core_2.10:1.6.5 scala> Seq(1).toDS().write.format("parquet").save("foo") ``` Before: ``` java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.h2o.DefaultSource could not be instantiated at java.util.ServiceLoader.fail(ServiceLoader.java:232) at java.util.ServiceLoader.access$100(ServiceLoader.java:185) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ... Caused by: java.lang.NoClassDefFoundError: org/apache/spark/Logging at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) ... ``` After: ``` java.lang.ClassNotFoundException: Detected an incompatible DataSourceRegister. Please remove the incompatible library from classpath or upgrade it. Error: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.h2o.DefaultSource could not be instantiated at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:178) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:441) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:213) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:196) ... ``` Author: Shixiong ZhuCloses #14651 from zsxwing/SPARK-17065. (cherry picked from commit 268b71d0d792f875fcfaec5314862236754a00d6) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45036327 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45036327 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45036327 Branch: refs/heads/branch-2.0 Commit: 45036327fdbdb0167b3c53245fce9dc2be67ffe9 Parents: 8f4cacd Author: Shixiong Zhu Authored: Mon Aug 15 15:55:32 2016 -0700 Committer: Yin Huai Committed: Mon Aug 15 15:55:50 2016 -0700 -- .../sql/execution/datasources/DataSource.scala | 91 +++- 1 file changed, 52 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/45036327/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index f572b93..f5727da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources -import java.util.ServiceLoader +import java.util.{ServiceConfigurationError, ServiceLoader} import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} @@ -123,50 +123,63 @@ case class DataSource( val loader = Utils.getContextOrSparkClassLoader val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) - serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match { - // the provider format did not match any given registered aliases - case Nil => -try { - Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match { -
spark git commit: [SPARK-17065][SQL] Improve the error message when encountering an incompatible DataSourceRegister
Repository: spark Updated Branches: refs/heads/master fffb0c0d1 -> 268b71d0d [SPARK-17065][SQL] Improve the error message when encountering an incompatible DataSourceRegister ## What changes were proposed in this pull request? Add an instruction to ask the user to remove or upgrade the incompatible DataSourceRegister in the error message. ## How was this patch tested? Test command: ``` build/sbt -Dscala-2.10 package SPARK_SCALA_VERSION=2.10 bin/spark-shell --packages ai.h2o:sparkling-water-core_2.10:1.6.5 scala> Seq(1).toDS().write.format("parquet").save("foo") ``` Before: ``` java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.h2o.DefaultSource could not be instantiated at java.util.ServiceLoader.fail(ServiceLoader.java:232) at java.util.ServiceLoader.access$100(ServiceLoader.java:185) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ... Caused by: java.lang.NoClassDefFoundError: org/apache/spark/Logging at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) ... ``` After: ``` java.lang.ClassNotFoundException: Detected an incompatible DataSourceRegister. Please remove the incompatible library from classpath or upgrade it. Error: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.h2o.DefaultSource could not be instantiated at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:178) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:441) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:213) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:196) ... ``` Author: Shixiong ZhuCloses #14651 from zsxwing/SPARK-17065. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/268b71d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/268b71d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/268b71d0 Branch: refs/heads/master Commit: 268b71d0d792f875fcfaec5314862236754a00d6 Parents: fffb0c0 Author: Shixiong Zhu Authored: Mon Aug 15 15:55:32 2016 -0700 Committer: Yin Huai Committed: Mon Aug 15 15:55:32 2016 -0700 -- .../sql/execution/datasources/DataSource.scala | 91 +++- 1 file changed, 52 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/268b71d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 79024fd..5ad6ae0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources -import java.util.ServiceLoader +import java.util.{ServiceConfigurationError, ServiceLoader} import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} @@ -124,50 +124,63 @@ case class DataSource( val loader = Utils.getContextOrSparkClassLoader val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) - serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match { - // the provider format did not match any given registered aliases - case Nil => -try { - Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match { -case Success(dataSource) => - // Found the data source using fully qualified path -
spark git commit: [SPARK-16700][PYSPARK][SQL] create DataFrame from dict/Row with schema
Repository: spark Updated Branches: refs/heads/master 5da6c4b24 -> fffb0c0d1 [SPARK-16700][PYSPARK][SQL] create DataFrame from dict/Row with schema ## What changes were proposed in this pull request? In 2.0, we verify the data type against schema for every row for safety, but with performance cost, this PR make it optional. When we verify the data type for StructType, it does not support all the types we support in infer schema (for example, dict), this PR fix that to make them consistent. For Row object which is created using named arguments, the order of fields are sorted by name, they may be not different than the order in provided schema, this PR fix that by ignore the order of fields in this case. ## How was this patch tested? Created regression tests for them. Author: Davies LiuCloses #14469 from davies/py_dict. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fffb0c0d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fffb0c0d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fffb0c0d Branch: refs/heads/master Commit: fffb0c0d19a2444e7554dfe6b27de0c086112b17 Parents: 5da6c4b Author: Davies Liu Authored: Mon Aug 15 12:41:27 2016 -0700 Committer: Josh Rosen Committed: Mon Aug 15 12:41:27 2016 -0700 -- python/pyspark/sql/context.py | 8 ++-- python/pyspark/sql/session.py | 29 + python/pyspark/sql/tests.py | 16 python/pyspark/sql/types.py | 37 +++-- 4 files changed, 62 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fffb0c0d/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 4085f16..7482be8 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -215,7 +215,7 @@ class SQLContext(object): @since(1.3) @ignore_unicode_prefix -def createDataFrame(self, data, schema=None, samplingRatio=None): +def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True): """ Creates a :class:`DataFrame` from an :class:`RDD`, a list or a :class:`pandas.DataFrame`. @@ -245,6 +245,7 @@ class SQLContext(object): ``byte`` instead of ``tinyint`` for :class:`pyspark.sql.types.ByteType`. We can also use ``int`` as a short name for :class:`pyspark.sql.types.IntegerType`. :param samplingRatio: the sample ratio of rows used for inferring +:param verifySchema: verify data types of every row against schema. :return: :class:`DataFrame` .. versionchanged:: 2.0 @@ -253,6 +254,9 @@ class SQLContext(object): If it's not a :class:`pyspark.sql.types.StructType`, it will be wrapped into a :class:`pyspark.sql.types.StructType` and each record will also be wrapped into a tuple. +.. versionchanged:: 2.1 + Added verifySchema. + >>> l = [('Alice', 1)] >>> sqlContext.createDataFrame(l).collect() [Row(_1=u'Alice', _2=1)] @@ -300,7 +304,7 @@ class SQLContext(object): ... Py4JJavaError: ... """ -return self.sparkSession.createDataFrame(data, schema, samplingRatio) +return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema) @since(1.3) def registerDataFrameAsTable(self, df, tableName): http://git-wip-us.apache.org/repos/asf/spark/blob/fffb0c0d/python/pyspark/sql/session.py -- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 2dacf48..61fa107 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -384,17 +384,15 @@ class SparkSession(object): if schema is None or isinstance(schema, (list, tuple)): struct = self._inferSchemaFromList(data) +converter = _create_converter(struct) +data = map(converter, data) if isinstance(schema, (list, tuple)): for i, name in enumerate(schema): struct.fields[i].name = name struct.names[i] = name schema = struct -elif isinstance(schema, StructType): -for row in data: -_verify_type(row, schema) - -else: +elif not isinstance(schema, StructType): raise TypeError("schema should be StructType or list or None, but got: %s" % schema) # convert python objects to sql data @@ -403,7 +401,7 @@ class SparkSession(object):
spark git commit: [SPARK-16671][CORE][SQL] Consolidate code to do variable substitution.
Repository: spark Updated Branches: refs/heads/master 564fe614c -> 5da6c4b24 [SPARK-16671][CORE][SQL] Consolidate code to do variable substitution. Both core and sql have slightly different code that does variable substitution of config values. This change refactors that code and encapsulates the logic of reading config values and expading variables in a new helper class, which can be configured so that both core and sql can use it without losing existing functionality, and allows for easier testing and makes it easier to add more features in the future. Tested with existing and new unit tests, and by running spark-shell with some configs referencing variables and making sure it behaved as expected. Author: Marcelo VanzinCloses #14468 from vanzin/SPARK-16671. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5da6c4b2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5da6c4b2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5da6c4b2 Branch: refs/heads/master Commit: 5da6c4b24f512b63cd4e6ba7dd8968066a9396f5 Parents: 564fe61 Author: Marcelo Vanzin Authored: Mon Aug 15 11:09:54 2016 -0700 Committer: Marcelo Vanzin Committed: Mon Aug 15 11:09:54 2016 -0700 -- .../main/scala/org/apache/spark/SparkConf.scala | 9 +- .../spark/internal/config/ConfigEntry.scala | 92 +++- .../spark/internal/config/ConfigProvider.scala | 74 + .../spark/internal/config/ConfigReader.scala| 106 +++ .../internal/config/ConfigEntrySuite.scala | 78 ++ .../internal/config/ConfigReaderSuite.scala | 62 +++ .../org/apache/spark/sql/internal/SQLConf.scala | 9 +- .../sql/internal/VariableSubstitution.scala | 92 +++- .../internal/VariableSubstitutionSuite.scala| 18 9 files changed, 312 insertions(+), 228 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5da6c4b2/core/src/main/scala/org/apache/spark/SparkConf.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index b6d244b..31b41d9 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.LinkedHashSet import org.apache.avro.{Schema, SchemaNormalization} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} +import org.apache.spark.internal.config._ import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils @@ -56,6 +56,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria private val settings = new ConcurrentHashMap[String, String]() + private val reader = new ConfigReader(new SparkConfigProvider(settings)) + reader.bindEnv(new ConfigProvider { +override def get(key: String): Option[String] = Option(getenv(key)) + }) + if (loadDefaults) { loadFromSystemProperties(false) } @@ -248,7 +253,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria * - This will throw an exception is the config is not optional and the value is not set. */ private[spark] def get[T](entry: ConfigEntry[T]): T = { -entry.readFrom(settings, getenv) +entry.readFrom(reader) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/5da6c4b2/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala index e2e23b3..113037d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala @@ -26,22 +26,9 @@ import org.apache.spark.SparkConf /** * An entry contains all meta information for a configuration. * - * Config options created using this feature support variable expansion. If the config value - * contains variable references of the form "${prefix:variableName}", the reference will be replaced - * with the value of the variable depending on the prefix. The prefix can be one of: - * - * - no prefix: if the config key starts with "spark", looks for the value in the Spark config - * - system: looks for the value in the system properties - * - env: looks for the value in the environment - * - * So referencing "${spark.master}" will look for the value of "spark.master" in the Spark - *
spark git commit: [SPARK-16508][SPARKR] Split docs for arrange and orderBy methods
Repository: spark Updated Branches: refs/heads/master 3d8bfe7a3 -> 564fe614c [SPARK-16508][SPARKR] Split docs for arrange and orderBy methods ## What changes were proposed in this pull request? This PR splits arrange and orderBy methods according to their functionality (the former for sorting sparkDataFrame and the latter for windowSpec). ## How was this patch tested? ![screen shot 2016-08-06 at 6 39 19 pm](https://cloud.githubusercontent.com/assets/15318264/17459969/51eade28-5c05-11e6-8ca1-8d8a8e344bab.png) ![screen shot 2016-08-06 at 6 39 29 pm](https://cloud.githubusercontent.com/assets/15318264/17459966/51e3c246-5c05-11e6-8d35-3e905ca48676.png) ![screen shot 2016-08-06 at 6 40 02 pm](https://cloud.githubusercontent.com/assets/15318264/17459967/51e650ec-5c05-11e6-8698-0f037f5199ff.png) Author: Junyang QianCloses #14522 from junyangq/SPARK-16508-0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/564fe614 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/564fe614 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/564fe614 Branch: refs/heads/master Commit: 564fe614c11deb657e0ac9e6b75e65370c48b7fe Parents: 3d8bfe7 Author: Junyang Qian Authored: Mon Aug 15 11:03:03 2016 -0700 Committer: Shivaram Venkataraman Committed: Mon Aug 15 11:03:03 2016 -0700 -- .gitignore | 1 + R/pkg/R/DataFrame.R | 11 +-- R/pkg/R/WindowSpec.R | 18 ++ R/pkg/R/generics.R | 2 +- 4 files changed, 17 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/564fe614/.gitignore -- diff --git a/.gitignore b/.gitignore index 225aa61..0991976 100644 --- a/.gitignore +++ b/.gitignore @@ -82,3 +82,4 @@ spark-warehouse/ *.Rproj *.Rproj.* +.Rproj.user http://git-wip-us.apache.org/repos/asf/spark/blob/564fe614/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 0ce4696..09be06d 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2048,14 +2048,14 @@ setMethod("rename", setClassUnion("characterOrColumn", c("character", "Column")) -#' Arrange +#' Arrange Rows by Variables #' #' Sort a SparkDataFrame by the specified column(s). #' -#' @param x A SparkDataFrame to be sorted. -#' @param col A character or Column object vector indicating the fields to sort on -#' @param ... Additional sorting fields -#' @param decreasing A logical argument indicating sorting order for columns when +#' @param x a SparkDataFrame to be sorted. +#' @param col a character or Column object indicating the fields to sort on +#' @param ... additional sorting fields +#' @param decreasing a logical argument indicating sorting order for columns when #' a character vector is specified for col #' @return A SparkDataFrame where all elements are sorted. #' @family SparkDataFrame functions @@ -2120,7 +2120,6 @@ setMethod("arrange", }) #' @rdname arrange -#' @name orderBy #' @aliases orderBy,SparkDataFrame,characterOrColumn-method #' @export #' @note orderBy(SparkDataFrame, characterOrColumn) since 1.4.0 http://git-wip-us.apache.org/repos/asf/spark/blob/564fe614/R/pkg/R/WindowSpec.R -- diff --git a/R/pkg/R/WindowSpec.R b/R/pkg/R/WindowSpec.R index 4746380..751ba3f 100644 --- a/R/pkg/R/WindowSpec.R +++ b/R/pkg/R/WindowSpec.R @@ -82,16 +82,18 @@ setMethod("partitionBy", } }) -#' orderBy +#' Ordering Columns in a WindowSpec #' #' Defines the ordering columns in a WindowSpec. -#' #' @param x a WindowSpec -#' @return a WindowSpec -#' @rdname arrange +#' @param col a character or Column object indicating an ordering column +#' @param ... additional sorting fields +#' @return A WindowSpec. #' @name orderBy +#' @rdname orderBy #' @aliases orderBy,WindowSpec,character-method #' @family windowspec_method +#' @seealso See \link{arrange} for use in sorting a SparkDataFrame #' @export #' @examples #' \dontrun{ @@ -105,7 +107,7 @@ setMethod("orderBy", windowSpec(callJMethod(x@sws, "orderBy", col, list(...))) }) -#' @rdname arrange +#' @rdname orderBy #' @name orderBy #' @aliases orderBy,WindowSpec,Column-method #' @export @@ -122,7 +124,7 @@ setMethod("orderBy", #' rowsBetween #' #' Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). -#' +#' #' Both `start` and `end` are relative positions from the current row. For example, "0" means #' "current row", while "-1" means the row before the current row, and "5" means the fifth row
spark git commit: [SPARK-16934][ML][MLLIB] Update LogisticCostAggregator serialization code to make it consistent with LinearRegression
Repository: spark Updated Branches: refs/heads/master ddf0d1e3f -> 3d8bfe7a3 [SPARK-16934][ML][MLLIB] Update LogisticCostAggregator serialization code to make it consistent with LinearRegression ## What changes were proposed in this pull request? Update LogisticCostAggregator serialization code to make it consistent with #14109 ## How was this patch tested? MLlib 2.0: ![image](https://cloud.githubusercontent.com/assets/19235986/17649601/5e2a79ac-61ee-11e6-833c-3bd8b5250470.png) After this PR: ![image](https://cloud.githubusercontent.com/assets/19235986/17649599/52b002ae-61ee-11e6-9402-9feb3439880f.png) Author: WeichenXuCloses #14520 from WeichenXu123/improve_logistic_regression_costfun. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d8bfe7a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d8bfe7a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d8bfe7a Branch: refs/heads/master Commit: 3d8bfe7a39015c84cf95561fe17eb2808ce44084 Parents: ddf0d1e Author: WeichenXu Authored: Mon Aug 15 06:38:30 2016 -0700 Committer: Yanbo Liang Committed: Mon Aug 15 06:38:30 2016 -0700 -- .../ml/classification/LogisticRegression.scala | 36 +++- 1 file changed, 20 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3d8bfe7a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 88d1b45..fce3935 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ @@ -346,8 +347,9 @@ class LogisticRegression @Since("1.2.0") ( val regParamL1 = $(elasticNetParam) * $(regParam) val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) +val bcFeaturesStd = instances.context.broadcast(featuresStd) val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), - $(standardization), featuresStd, featuresMean, regParamL2) + $(standardization), bcFeaturesStd, regParamL2) val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) { new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) @@ -442,6 +444,7 @@ class LogisticRegression @Since("1.2.0") ( rawCoefficients(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 } i += 1 } +bcFeaturesStd.destroy(blocking = false) if ($(fitIntercept)) { (Vectors.dense(rawCoefficients.dropRight(1)).compressed, rawCoefficients.last, @@ -938,11 +941,15 @@ class BinaryLogisticRegressionSummary private[classification] ( * Two LogisticAggregator can be merged together to have a summary of loss and gradient of * the corresponding joint dataset. * + * @param bcCoefficients The broadcast coefficients corresponding to the features. + * @param bcFeaturesStd The broadcast standard deviation values of the features. * @param numClasses the number of possible outcomes for k classes classification problem in * Multinomial Logistic Regression. * @param fitIntercept Whether to fit an intercept term. */ private class LogisticAggregator( +val bcCoefficients: Broadcast[Vector], +val bcFeaturesStd: Broadcast[Array[Double]], private val numFeatures: Int, numClasses: Int, fitIntercept: Boolean) extends Serializable { @@ -958,14 +965,9 @@ private class LogisticAggregator( * of the objective function. * * @param instance The instance of data point to be added. - * @param coefficients The coefficients corresponding to the features. - * @param featuresStd The standard deviation values of the features. * @return This LogisticAggregator object. */ - def add( - instance: Instance, - coefficients: Vector, - featuresStd: Array[Double]): this.type = { + def add(instance: Instance): this.type = { instance match { case Instance(label, weight, features) => require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + s" Expecting $numFeatures but got
spark git commit: [TRIVIAL][ML] Fix LogisticRegression typo in error message.
Repository: spark Updated Branches: refs/heads/master 1a028bdef -> ddf0d1e3f [TRIVIAL][ML] Fix LogisticRegression typo in error message. ## What changes were proposed in this pull request? Fix ```LogisticRegression``` typo in error message. ## How was this patch tested? Docs change, no new tests. Author: Yanbo LiangCloses #14633 from yanboliang/lr-typo. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ddf0d1e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ddf0d1e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ddf0d1e3 Branch: refs/heads/master Commit: ddf0d1e3fe18bcd01e1447feea1b76ce86087b3b Parents: 1a028bd Author: Yanbo Liang Authored: Mon Aug 15 10:11:29 2016 +0100 Committer: Sean Owen Committed: Mon Aug 15 10:11:29 2016 +0100 -- .../org/apache/spark/ml/classification/LogisticRegression.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ddf0d1e3/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 90baa41..88d1b45 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -303,7 +303,7 @@ class LogisticRegression @Since("1.2.0") ( val (coefficients, intercept, objectiveHistory) = { if (numInvalid != 0) { -val msg = s"Classification labels should be in {0 to ${numClasses - 1} " + +val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + s"Found $numInvalid invalid labels." logError(msg) throw new SparkException(msg) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11714][MESOS] Make Spark on Mesos honor port restrictions on coarse grain mode
Repository: spark Updated Branches: refs/heads/master 2a3d286f3 -> 1a028bdef [SPARK-11714][MESOS] Make Spark on Mesos honor port restrictions on coarse grain mode - Make mesos coarse grained scheduler accept port offers and pre-assign ports Previous attempt was for fine grained: https://github.com/apache/spark/pull/10808 Author: Stavros KontopoulosAuthor: Stavros Kontopoulos Closes #11157 from skonto/honour_ports_coarse. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a028bde Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a028bde Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a028bde Branch: refs/heads/master Commit: 1a028bdefa6312bf0eec46b89a1947da7e9d84af Parents: 2a3d286 Author: Stavros Kontopoulos Authored: Mon Aug 15 09:55:32 2016 +0100 Committer: Sean Owen Committed: Mon Aug 15 09:55:32 2016 +0100 -- .../main/scala/org/apache/spark/SparkEnv.scala | 1 + .../MesosCoarseGrainedSchedulerBackend.scala| 59 ++--- .../cluster/mesos/MesosSchedulerUtils.scala | 125 ++- ...esosCoarseGrainedSchedulerBackendSuite.scala | 42 ++- .../mesos/MesosSchedulerUtilsSuite.scala| 114 - .../spark/scheduler/cluster/mesos/Utils.scala | 20 ++- 6 files changed, 336 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1a028bde/core/src/main/scala/org/apache/spark/SparkEnv.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index af50a6d..cc8e3fd 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -231,6 +231,7 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", rpcEnv.address.port.toString) } else if (rpcEnv.address != null) { conf.set("spark.executor.port", rpcEnv.address.port.toString) + logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}") } // Create an instance of the class with the given name, possibly initializing it with our conf http://git-wip-us.apache.org/repos/asf/spark/blob/1a028bde/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 4a88824..6b9313e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.{Buffer, HashMap, HashSet} import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} @@ -71,13 +70,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) // Cores we have acquired with each Mesos task ID - val coresByTaskId = new HashMap[String, Int] + val coresByTaskId = new mutable.HashMap[String, Int] var totalCoresAcquired = 0 // SlaveID -> Slave // This map accumulates entries for the duration of the job. Slaves are never deleted, because // we need to maintain e.g. failure state and connection state. - private val slaves = new HashMap[String, Slave] + private val slaves = new mutable.HashMap[String, Slave] /** * The total number of executors we aim to have. Undefined when not using dynamic allocation. @@ -285,7 +284,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } private def declineUnmatchedOffers( - d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = { + d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = { offers.foreach { offer => declineOffer(d, offer, Some("unmet constraints"), Some(rejectOfferDurationForUnmetConstraints)) @@ -302,9 +301,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerAttributes = toAttributeMap(offer.getAttributesList) val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus") +val ports =