svn commit: r27391 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_12_00_01-01452ea-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jun 12 07:17:35 2018 New Revision: 27391 Log: Apache Spark 2.4.0-SNAPSHOT-2018_06_12_00_01-01452ea docs [This commit notification would consist of 1467 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r27394 - in /dev/spark/2.3.2-SNAPSHOT-2018_06_12_02_01-bf58687-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jun 12 09:16:47 2018 New Revision: 27394 Log: Apache Spark 2.3.2-SNAPSHOT-2018_06_12_02_01-bf58687 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: docs: fix typo
Repository: spark Updated Branches: refs/heads/master 01452ea9c -> 1d7db65e9 docs: fix typo no => no[t] ## What changes were proposed in this pull request? Fixing a typo. ## How was this patch tested? Visual check of the docs. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tom Saleeba Closes #21496 from tomsaleeba/patch-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d7db65e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d7db65e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d7db65e Branch: refs/heads/master Commit: 1d7db65e968de1c601e7f8b1ec9bc783ef2dbd01 Parents: 01452ea Author: Tom Saleeba Authored: Tue Jun 12 09:22:52 2018 -0500 Committer: Sean Owen Committed: Tue Jun 12 09:22:52 2018 -0500 -- sql/core/src/main/scala/org/apache/spark/sql/Column.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1d7db65e/sql/core/src/main/scala/org/apache/spark/sql/Column.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 2dbb53e..4eee3de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -104,7 +104,7 @@ class TypedColumn[-T, U]( * * {{{ * df("columnName")// On a specific `df` DataFrame. - * col("columnName") // A generic column no yet associated with a DataFrame. + * col("columnName") // A generic column not yet associated with a DataFrame. * col("columnName.field") // Extracting a struct field * col("`a.column.with.dots`") // Escape `.` in column names. * $"columnName" // Scala short hand for a named column. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15064][ML] Locale support in StopWordsRemover
Repository: spark Updated Branches: refs/heads/master 1d7db65e9 -> 5d6a53d98 [SPARK-15064][ML] Locale support in StopWordsRemover ## What changes were proposed in this pull request? Add locale support for `StopWordsRemover`. ## How was this patch tested? [Scala|Python] unit tests. Author: Lee Dongjin Closes #21501 from dongjinleekr/feature/SPARK-15064. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d6a53d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d6a53d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d6a53d9 Branch: refs/heads/master Commit: 5d6a53d9831cc1e2115560db5cebe0eea2565dcd Parents: 1d7db65 Author: Lee Dongjin Authored: Tue Jun 12 08:16:37 2018 -0700 Committer: Xiangrui Meng Committed: Tue Jun 12 08:16:37 2018 -0700 -- .../spark/ml/feature/StopWordsRemover.scala | 30 ++-- .../ml/feature/StopWordsRemoverSuite.scala | 51 python/pyspark/ml/feature.py| 30 ++-- python/pyspark/ml/tests.py | 7 +++ 4 files changed, 109 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5d6a53d9/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala index 3fcd84c..0f946dd 100755 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala @@ -17,9 +17,11 @@ package org.apache.spark.ml.feature +import java.util.Locale + import org.apache.spark.annotation.Since import org.apache.spark.ml.Transformer -import org.apache.spark.ml.param.{BooleanParam, ParamMap, StringArrayParam} +import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset} @@ -84,7 +86,27 @@ class StopWordsRemover @Since("1.5.0") (@Since("1.5.0") override val uid: String @Since("1.5.0") def getCaseSensitive: Boolean = $(caseSensitive) - setDefault(stopWords -> StopWordsRemover.loadDefaultStopWords("english"), caseSensitive -> false) + /** + * Locale of the input for case insensitive matching. Ignored when [[caseSensitive]] + * is true. + * Default: Locale.getDefault.toString + * @group param + */ + @Since("2.4.0") + val locale: Param[String] = new Param[String](this, "locale", +"Locale of the input for case insensitive matching. Ignored when caseSensitive is true.", + ParamValidators.inArray[String](Locale.getAvailableLocales.map(_.toString))) + + /** @group setParam */ + @Since("2.4.0") + def setLocale(value: String): this.type = set(locale, value) + + /** @group getParam */ + @Since("2.4.0") + def getLocale: String = $(locale) + + setDefault(stopWords -> StopWordsRemover.loadDefaultStopWords("english"), +caseSensitive -> false, locale -> Locale.getDefault.toString) @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { @@ -95,8 +117,8 @@ class StopWordsRemover @Since("1.5.0") (@Since("1.5.0") override val uid: String terms.filter(s => !stopWordsSet.contains(s)) } } else { - // TODO: support user locale (SPARK-15064) - val toLower = (s: String) => if (s != null) s.toLowerCase else s + val lc = new Locale($(locale)) + val toLower = (s: String) => if (s != null) s.toLowerCase(lc) else s val lowerStopWords = $(stopWords).map(toLower(_)).toSet udf { terms: Seq[String] => terms.filter(s => !lowerStopWords.contains(toLower(s))) http://git-wip-us.apache.org/repos/asf/spark/blob/5d6a53d9/mllib/src/test/scala/org/apache/spark/ml/feature/StopWordsRemoverSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StopWordsRemoverSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StopWordsRemoverSuite.scala index 21259a5..20972d1 100755 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/StopWordsRemoverSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StopWordsRemoverSuite.scala @@ -65,6 +65,57 @@ class StopWordsRemoverSuite extends MLTest with DefaultReadWriteTest { testStopWordsRemover(remover, dataSet) } + test("StopWordsRemover with localed input (case insensitive)") { +val stopWords = Array("milk", "cookie") +val remover = new StopWordsRemover() + .setInputCol("raw") + .setOutputCol("filtered") + .setStopWords(stopWords) + .setCaseS
svn commit: r27399 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_12_08_01-1d7db65-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jun 12 15:16:36 2018 New Revision: 27399 Log: Apache Spark 2.4.0-SNAPSHOT-2018_06_12_08_01-1d7db65 docs [This commit notification would consist of 1467 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark-website git commit: Updates to the release guide.
Repository: spark-website Updated Branches: refs/heads/asf-site cea85ce67 -> eb97812f5 Updates to the release guide. - Fix dangerous commands that would flood the ASF repo with requests. - Add instruction for updating release KEYS. - Fix the PyPI instructions (including link to the message on private). - Add some more instructions for updating the web site. - Remove some outdated instructions. - Add SparkR instructions. Author: Marcelo Vanzin Closes #116 from vanzin/rm-fixes. Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/eb97812f Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/eb97812f Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/eb97812f Branch: refs/heads/asf-site Commit: eb97812f59eb86afdfec7fa6351d2ef2b8f935d5 Parents: cea85ce Author: Marcelo Vanzin Authored: Tue Jun 12 09:40:17 2018 -0700 Committer: Marcelo Vanzin Committed: Tue Jun 12 09:40:17 2018 -0700 -- README.md | 6 ++- release-process.md| 89 +++ site/mailing-lists.html | 2 +- site/release-process.html | 95 -- 4 files changed, 92 insertions(+), 100 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/eb97812f/README.md -- diff --git a/README.md b/README.md index 667519b..209e8f8 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,9 @@ In addition to generating the site as HTML from the markdown files, jekyll can s a web server. To build the site and run a web server use the command `jekyll serve` which runs the web server on port 4000, then visit the site at http://localhost:4000. +Please make sure you always run `jekyll build` after testing your changes with `jekyll server`, +otherwise you end up with broken links in a few places. + ## Docs sub-dir The docs are not generated as part of the website. They are built separately for each release @@ -41,5 +44,4 @@ compile phase, use the following syntax: ## Merge PR -To merge pull request, use the merge_pr.py script which also squash the commits. - +To merge pull request, use the `merge_pr.py` script which also squashes the commits. http://git-wip-us.apache.org/repos/asf/spark-website/blob/eb97812f/release-process.md -- diff --git a/release-process.md b/release-process.md index f25a429..e756cc0 100644 --- a/release-process.md +++ b/release-process.md @@ -42,7 +42,7 @@ standard Git branching mechanism and should be announced to the community once t created. It is also good to set up Jenkins jobs for the release branch once it is cut to -ensure tests are passing. These are jobs like +ensure tests are passing. These are jobs like https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test/job/spark-branch-2.3-test-maven-hadoop-2.7/ . Consult Josh Rosen and Shane Knapp for help with this. Also remember to add the newly-added jobs to the test dashboard at https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/ . @@ -54,7 +54,7 @@ last RC are marked as `Resolved` and has a `Target Versions` set to this release To track any issue with pending PR targeting this release, create a filter in JIRA with a query like this -`project = SPARK AND "Target Version/s" = "12340470" AND status in (OPEN, "In Progress")` +`project = SPARK AND "Target Version/s" = "12340470" AND status in (Open, Reopened, "In Progress")` For target version string value to use, find the numeric value corresponds to the release by looking into @@ -95,7 +95,7 @@ Instead much of the same release logic can be accessed in `dev/create-release/re ``` # Move dev/ to release/ when the voting is completed. See Finalize the Release below -svn co "https://dist.apache.org/repos/dist/dev/spark"; svn-spark +svn co --depth=files "https://dist.apache.org/repos/dist/dev/spark"; svn-spark # edit svn-spark/KEYS file svn ci --username $ASF_USERNAME --password "$ASF_PASSWORD" -m"Update KEYS" ``` @@ -134,13 +134,15 @@ move the artifacts into the release folder, they cannot be removed.** After the vote passes, to upload the binaries to Apache mirrors, you move the binaries from dev directory (this should be where they are voted) to release directory. This "moving" is the only way you can add stuff to the actual release directory. ``` -# Checkout the Spark directory in Apache distribution SVN "dev" repo -$ svn co https://dist.apache.org/repos/dist/dev/spark/ - # Move the sub-directory in "dev" to the # corresponding directory in "release" $ export SVN_EDITOR=vim $ svn mv https://dist.apache.org/repos/dist/dev/spark/spark-1.1.1-rc2 https://dis
spark git commit: [SPARK-24531][TESTS] Remove version 2.2.0 from testing versions in HiveExternalCatalogVersionsSuite
Repository: spark Updated Branches: refs/heads/master 5d6a53d98 -> 2824f1436 [SPARK-24531][TESTS] Remove version 2.2.0 from testing versions in HiveExternalCatalogVersionsSuite ## What changes were proposed in this pull request? Removing version 2.2.0 from testing versions in HiveExternalCatalogVersionsSuite as it is not present anymore in the mirrors and this is blocking all the open PRs. ## How was this patch tested? running UTs Author: Marco Gaido Closes #21540 from mgaido91/SPARK-24531. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2824f143 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2824f143 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2824f143 Branch: refs/heads/master Commit: 2824f1436bb0371b7216730455f02456ef8479ce Parents: 5d6a53d Author: Marco Gaido Authored: Tue Jun 12 09:56:35 2018 -0700 Committer: Xiao Li Committed: Tue Jun 12 09:56:35 2018 -0700 -- .../apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2824f143/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index ea86ab9..6f904c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -195,7 +195,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { object PROCESS_TABLES extends QueryTest with SQLTestUtils { // Tests the latest version of every release line. - val testingVersions = Seq("2.0.2", "2.1.2", "2.2.0", "2.2.1", "2.3.0") + val testingVersions = Seq("2.0.2", "2.1.2", "2.2.1", "2.3.0") protected var spark: SparkSession = _ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24531][TESTS] Remove version 2.2.0 from testing versions in HiveExternalCatalogVersionsSuite
Repository: spark Updated Branches: refs/heads/branch-2.3 bf5868757 -> 63e1da162 [SPARK-24531][TESTS] Remove version 2.2.0 from testing versions in HiveExternalCatalogVersionsSuite ## What changes were proposed in this pull request? Removing version 2.2.0 from testing versions in HiveExternalCatalogVersionsSuite as it is not present anymore in the mirrors and this is blocking all the open PRs. ## How was this patch tested? running UTs Author: Marco Gaido Closes #21540 from mgaido91/SPARK-24531. (cherry picked from commit 2824f1436bb0371b7216730455f02456ef8479ce) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63e1da16 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63e1da16 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63e1da16 Branch: refs/heads/branch-2.3 Commit: 63e1da16294e02affa27ab43a3ef0ae62e0c7672 Parents: bf58687 Author: Marco Gaido Authored: Tue Jun 12 09:56:35 2018 -0700 Committer: Xiao Li Committed: Tue Jun 12 09:56:48 2018 -0700 -- .../apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/63e1da16/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index ea86ab9..6f904c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -195,7 +195,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { object PROCESS_TABLES extends QueryTest with SQLTestUtils { // Tests the latest version of every release line. - val testingVersions = Seq("2.0.2", "2.1.2", "2.2.0", "2.2.1", "2.3.0") + val testingVersions = Seq("2.0.2", "2.1.2", "2.2.1", "2.3.0") protected var spark: SparkSession = _ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24531][TESTS] Remove version 2.2.0 from testing versions in HiveExternalCatalogVersionsSuite
Repository: spark Updated Branches: refs/heads/branch-2.2 c306a8461 -> bf0b21298 [SPARK-24531][TESTS] Remove version 2.2.0 from testing versions in HiveExternalCatalogVersionsSuite Removing version 2.2.0 from testing versions in HiveExternalCatalogVersionsSuite as it is not present anymore in the mirrors and this is blocking all the open PRs. running UTs Author: Marco Gaido Closes #21540 from mgaido91/SPARK-24531. (cherry picked from commit 2824f1436bb0371b7216730455f02456ef8479ce) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf0b2129 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf0b2129 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf0b2129 Branch: refs/heads/branch-2.2 Commit: bf0b212987375223ee234b5e532fae4705d5cad7 Parents: c306a84 Author: Marco Gaido Authored: Tue Jun 12 09:56:35 2018 -0700 Committer: Xiao Li Committed: Tue Jun 12 09:58:29 2018 -0700 -- .../apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bf0b2129/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index 2b37047..92c27e8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -160,7 +160,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { object PROCESS_TABLES extends QueryTest with SQLTestUtils { // Tests the latest version of every release line. - val testingVersions = Seq("2.0.2", "2.1.2", "2.2.0") + val testingVersions = Seq("2.0.2", "2.1.2", "2.2.1") protected var spark: SparkSession = _ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r27404 - in /dev/spark/2.3.2-SNAPSHOT-2018_06_12_10_01-63e1da1-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jun 12 17:15:46 2018 New Revision: 27404 Log: Apache Spark 2.3.2-SNAPSHOT-2018_06_12_10_01-63e1da1 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24416] Fix configuration specification for killBlacklisted executors
Repository: spark Updated Branches: refs/heads/master 2824f1436 -> 3af1d3e6d [SPARK-24416] Fix configuration specification for killBlacklisted executors ## What changes were proposed in this pull request? spark.blacklist.killBlacklistedExecutors is defined as (Experimental) If set to "true", allow Spark to automatically kill, and attempt to re-create, executors when they are blacklisted. Note that, when an entire node is added to the blacklist, all of the executors on that node will be killed. I presume the killing of blacklisted executors only happens after the stage completes successfully and all tasks have completed or on fetch failures (updateBlacklistForFetchFailure/updateBlacklistForSuccessfulTaskSet). It is confusing because the definition states that the executor will be attempted to be recreated as soon as it is blacklisted. This is not true while the stage is in progress and an executor is blacklisted, it will not attempt to cleanup until the stage finishes. Author: Sanket Chintapalli Closes #21475 from redsanket/SPARK-24416. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3af1d3e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3af1d3e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3af1d3e6 Branch: refs/heads/master Commit: 3af1d3e6d95719e15a997877d5ecd3bb40c08b9c Parents: 2824f14 Author: Sanket Chintapalli Authored: Tue Jun 12 13:55:08 2018 -0500 Committer: Imran Rashid Committed: Tue Jun 12 13:55:08 2018 -0500 -- docs/configuration.md | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3af1d3e6/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index 5588c37..6aa7878 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1656,9 +1656,10 @@ Apart from these, the following properties are also available, and may be useful spark.blacklist.killBlacklistedExecutors false -(Experimental) If set to "true", allow Spark to automatically kill, and attempt to re-create, -executors when they are blacklisted. Note that, when an entire node is added to the blacklist, -all of the executors on that node will be killed. +(Experimental) If set to "true", allow Spark to automatically kill the executors +when they are blacklisted on fetch failure or blacklisted for the entire application, +as controlled by spark.blacklist.application.*. Note that, when an entire node is added +to the blacklist, all of the executors on that node will be killed. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23931][SQL] Adds arrays_zip function to sparksql
Repository: spark Updated Branches: refs/heads/master 3af1d3e6d -> f0ef1b311 [SPARK-23931][SQL] Adds arrays_zip function to sparksql Signed-off-by: DylanGuedes ## What changes were proposed in this pull request? Addition of arrays_zip function to spark sql functions. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Unit tests that checks if the results are correct. Author: DylanGuedes Closes #21045 from DylanGuedes/SPARK-23931. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0ef1b31 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0ef1b31 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0ef1b31 Branch: refs/heads/master Commit: f0ef1b311dd5399290ad6abe4ca491bdb13478f0 Parents: 3af1d3e Author: DylanGuedes Authored: Tue Jun 12 11:57:25 2018 -0700 Committer: Takuya UESHIN Committed: Tue Jun 12 11:57:25 2018 -0700 -- python/pyspark/sql/functions.py | 17 ++ .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/collectionOperations.scala | 166 +++ .../CollectionExpressionsSuite.scala| 86 ++ .../scala/org/apache/spark/sql/functions.scala | 8 + .../spark/sql/DataFrameFunctionsSuite.scala | 47 ++ 6 files changed, 325 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f0ef1b31/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 1759195..0715297 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2394,6 +2394,23 @@ def array_repeat(col, count): return Column(sc._jvm.functions.array_repeat(_to_java_column(col), count)) +@since(2.4) +def arrays_zip(*cols): +""" +Collection function: Returns a merged array of structs in which the N-th struct contains all +N-th values of input arrays. + +:param cols: columns of arrays to be merged. + +>>> from pyspark.sql.functions import arrays_zip +>>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2']) +>>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect() +[Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])] +""" +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.arrays_zip(_to_seq(sc, cols, _to_java_column))) + + # User Defined Function -- class PandasUDFType(object): http://git-wip-us.apache.org/repos/asf/spark/blob/f0ef1b31/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 49fb35b..3c0b728 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -423,6 +423,7 @@ object FunctionRegistry { expression[Size]("size"), expression[Slice]("slice"), expression[Size]("cardinality"), +expression[ArraysZip]("arrays_zip"), expression[SortArray]("sort_array"), expression[ArrayMin]("array_min"), expression[ArrayMax]("array_max"), http://git-wip-us.apache.org/repos/asf/spark/blob/f0ef1b31/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 176995a..d76f301 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -128,6 +128,172 @@ case class MapKeys(child: Expression) override def prettyName: String = "map_keys" } +@ExpressionDescription( + usage = """ +_FUNC_(a1, a2, ...) - Returns a merged array of structs in which the N-th struct contains all +N-th values of input arrays. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4)); +[[1, 2], [2, 3], [3, 4]] + > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4)); +[[1, 2, 3], [2, 3, 4]] + ""
spark git commit: [SPARK-24216][SQL] Spark TypedAggregateExpression uses getSimpleName that is not safe in scala
Repository: spark Updated Branches: refs/heads/master f0ef1b311 -> cc88d7fad [SPARK-24216][SQL] Spark TypedAggregateExpression uses getSimpleName that is not safe in scala ## What changes were proposed in this pull request? When user create a aggregator object in scala and pass the aggregator to Spark Dataset's agg() method, Spark's will initialize TypedAggregateExpression with the nodeName field as aggregator.getClass.getSimpleName. However, getSimpleName is not safe in scala environment, depending on how user creates the aggregator object. For example, if the aggregator class full qualified name is "com.my.company.MyUtils$myAgg$2$", the getSimpleName will throw java.lang.InternalError "Malformed class name". This has been reported in scalatest https://github.com/scalatest/scalatest/pull/1044 and discussed in many scala upstream jiras such as SI-8110, SI-5425. To fix this issue, we follow the solution in https://github.com/scalatest/scalatest/pull/1044 to add safer version of getSimpleName as a util method, and TypedAggregateExpression will invoke this util method rather than getClass.getSimpleName. ## How was this patch tested? added unit test Author: Fangshi Li Closes #21276 from fangshil/SPARK-24216. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc88d7fa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc88d7fa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc88d7fa Branch: refs/heads/master Commit: cc88d7fad16e8b5cbf7b6b9bfe412908782b4a45 Parents: f0ef1b3 Author: Fangshi Li Authored: Tue Jun 12 12:10:08 2018 -0700 Committer: Wenchen Fan Committed: Tue Jun 12 12:10:08 2018 -0700 -- .../org/apache/spark/util/AccumulatorV2.scala | 6 +- .../scala/org/apache/spark/util/Utils.scala | 59 +++- .../org/apache/spark/util/UtilsSuite.scala | 16 ++ .../apache/spark/ml/util/Instrumentation.scala | 5 +- .../aggregate/TypedAggregateExpression.scala| 5 +- .../v2/DataSourceV2StringFormat.scala | 4 +- 6 files changed, 89 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cc88d7fa/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 3b469a6..bf618b4 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -200,10 +200,12 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { } override def toString: String = { +// getClass.getSimpleName can cause Malformed class name error, +// call safer `Utils.getSimpleName` instead if (metadata == null) { - "Un-registered Accumulator: " + getClass.getSimpleName + "Un-registered Accumulator: " + Utils.getSimpleName(getClass) } else { - getClass.getSimpleName + s"(id: $id, name: $name, value: $value)" + Utils.getSimpleName(getClass) + s"(id: $id, name: $name, value: $value)" } } } http://git-wip-us.apache.org/repos/asf/spark/blob/cc88d7fa/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f9191a5..7428db2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.io._ import java.lang.{Byte => JByte} +import java.lang.InternalError import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInfo} import java.lang.reflect.InvocationTargetException import java.math.{MathContext, RoundingMode} @@ -1820,7 +1821,7 @@ private[spark] object Utils extends Logging { /** Return the class name of the given object, removing all dollar signs */ def getFormattedClassName(obj: AnyRef): String = { -obj.getClass.getSimpleName.replace("$", "") +getSimpleName(obj.getClass).replace("$", "") } /** @@ -2715,6 +2716,62 @@ private[spark] object Utils extends Logging { HashCodes.fromBytes(secretBytes).toString() } + /** + * Safer than Class obj's getSimpleName which may throw Malformed class name error in scala. + * This method mimicks scalatest's getSimpleNameOfAnObjectsClass. + */ + def getSimpleName(cls: Class[_]): String = { +try { + return cls.getSimpleName +} catch { + case err: InternalError => return stripDollars(stripPackages(cls.getName)) +} + } + + /** + * Remove the pa
svn commit: r27409 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_12_12_02-f0ef1b3-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jun 12 19:19:06 2018 New Revision: 27409 Log: Apache Spark 2.4.0-SNAPSHOT-2018_06_12_12_02-f0ef1b3 docs [This commit notification would consist of 1467 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23933][SQL] Add map_from_arrays function
Repository: spark Updated Branches: refs/heads/master cc88d7fad -> ada28f259 [SPARK-23933][SQL] Add map_from_arrays function ## What changes were proposed in this pull request? The PR adds the SQL function `map_from_arrays`. The behavior of the function is based on Presto's `map`. Since SparkSQL already had a `map` function, we prepared the different name for this behavior. This function returns returns a map from a pair of arrays for keys and values. ## How was this patch tested? Added UTs Author: Kazuaki Ishizaki Closes #21258 from kiszk/SPARK-23933. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ada28f25 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ada28f25 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ada28f25 Branch: refs/heads/master Commit: ada28f25955a9e8ddd182ad41b2a4ef278f3d809 Parents: cc88d7f Author: Kazuaki Ishizaki Authored: Tue Jun 12 12:31:22 2018 -0700 Committer: Takuya UESHIN Committed: Tue Jun 12 12:31:22 2018 -0700 -- python/pyspark/sql/functions.py | 19 ++ .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/complexTypeCreator.scala| 72 +++- .../catalyst/expressions/ComplexTypeSuite.scala | 44 .../scala/org/apache/spark/sql/functions.scala | 11 +++ .../spark/sql/DataFrameFunctionsSuite.scala | 30 6 files changed, 176 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ada28f25/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 0715297..1cdbb8a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1819,6 +1819,25 @@ def create_map(*cols): return Column(jc) +@since(2.4) +def map_from_arrays(col1, col2): +"""Creates a new map from two arrays. + +:param col1: name of column containing a set of keys. All elements should not be null +:param col2: name of column containing a set of values + +>>> df = spark.createDataFrame([([2, 5], ['a', 'b'])], ['k', 'v']) +>>> df.select(map_from_arrays(df.k, df.v).alias("map")).show() +++ +| map| +++ +|[2 -> a, 5 -> b]| +++ +""" +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.map_from_arrays(_to_java_column(col1), _to_java_column(col2))) + + @since(1.4) def array(*cols): """Creates a new array column. http://git-wip-us.apache.org/repos/asf/spark/blob/ada28f25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 3c0b728..3700c63 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -417,6 +417,7 @@ object FunctionRegistry { expression[CreateMap]("map"), expression[CreateNamedStruct]("named_struct"), expression[ElementAt]("element_at"), +expression[MapFromArrays]("map_from_arrays"), expression[MapKeys]("map_keys"), expression[MapValues]("map_values"), expression[MapEntries]("map_entries"), http://git-wip-us.apache.org/repos/asf/spark/blob/ada28f25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala index a9867aa..0a5f8a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, TypeUtils} +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.array.ByteArr
spark git commit: [SPARK-23010][BUILD][FOLLOWUP] Fix java checkstyle failure of kubernetes-integration-tests
Repository: spark Updated Branches: refs/heads/master ada28f259 -> 0d3714d22 [SPARK-23010][BUILD][FOLLOWUP] Fix java checkstyle failure of kubernetes-integration-tests ## What changes were proposed in this pull request? Fix java checkstyle failure of kubernetes-integration-tests ## How was this patch tested? Checked manually on my local environment. Author: Xingbo Jiang Closes #21545 from jiangxb1987/k8s-checkstyle. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d3714d2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d3714d2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d3714d2 Branch: refs/heads/master Commit: 0d3714d221460a2a1141134c3d451f18c4e0d46f Parents: ada28f2 Author: Xingbo Jiang Authored: Tue Jun 12 15:57:43 2018 -0700 Committer: Marcelo Vanzin Committed: Tue Jun 12 15:57:43 2018 -0700 -- project/SparkBuild.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0d3714d2/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index adc2b6b..b606f93 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -57,11 +57,11 @@ object BuildCommons { val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn, streamingFlumeSink, streamingFlume, streamingKafka, sparkGangliaLgpl, streamingKinesisAsl, -dockerIntegrationTests, hadoopCloud) = +dockerIntegrationTests, hadoopCloud, kubernetesIntegrationTests) = Seq("kubernetes", "mesos", "yarn", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl", - "docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _)) + "docker-integration-tests", "hadoop-cloud", "kubernetes-integration-tests").map(ProjectRef(buildLocation, _)) val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) = Seq("network-yarn", "streaming-flume-assembly", "streaming-kafka-0-8-assembly", "streaming-kafka-0-10-assembly", "streaming-kinesis-asl-assembly") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r27412 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_12_16_01-0d3714d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jun 12 23:15:35 2018 New Revision: 27412 Log: Apache Spark 2.4.0-SNAPSHOT-2018_06_12_16_01-0d3714d docs [This commit notification would consist of 1467 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24506][UI] Add UI filters to tabs added after binding
Repository: spark Updated Branches: refs/heads/branch-2.3 63e1da162 -> a55de387d [SPARK-24506][UI] Add UI filters to tabs added after binding ## What changes were proposed in this pull request? Currently, `spark.ui.filters` are not applied to the handlers added after binding the server. This means that every page which is added after starting the UI will not have the filters configured on it. This can allow unauthorized access to the pages. The PR adds the filters also to the handlers added after the UI starts. ## How was this patch tested? manual tests (without the patch, starting the thriftserver with `--conf spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter --conf spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"` you can access `http://localhost:4040/sqlserver`; with the patch, 401 is the response as for the other pages). Author: Marco Gaido Closes #21523 from mgaido91/SPARK-24506. (cherry picked from commit f53818d35bdef5d20a2718b14a2fed4c468545c6) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a55de387 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a55de387 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a55de387 Branch: refs/heads/branch-2.3 Commit: a55de387db901975e68e88c68ec464d49f240270 Parents: 63e1da1 Author: Marco Gaido Authored: Tue Jun 12 16:42:44 2018 -0700 Committer: Marcelo Vanzin Committed: Tue Jun 12 16:42:56 2018 -0700 -- .../org/apache/spark/deploy/history/HistoryServer.scala | 1 - core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 8 +--- 2 files changed, 5 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a55de387/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 611fa56..0654042 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -152,7 +152,6 @@ class HistoryServer( assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs") handlers.synchronized { ui.getHandlers.foreach(attachHandler) - addFilters(ui.getHandlers, conf) } } http://git-wip-us.apache.org/repos/asf/spark/blob/a55de387/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 0e8a630..ba98fa1 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -263,7 +263,7 @@ private[spark] object JettyUtils extends Logging { filters.foreach { case filter : String => if (!filter.isEmpty) { - logInfo("Adding filter: " + filter) + logInfo(s"Adding filter $filter to ${handlers.map(_.getContextPath).mkString(", ")}.") val holder : FilterHolder = new FilterHolder() holder.setClassName(filter) // Get any parameters for each filter @@ -406,7 +406,7 @@ private[spark] object JettyUtils extends Logging { } pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads)) - ServerInfo(server, httpPort, securePort, collection) + ServerInfo(server, httpPort, securePort, conf, collection) } catch { case e: Exception => server.stop() @@ -506,10 +506,12 @@ private[spark] case class ServerInfo( server: Server, boundPort: Int, securePort: Option[Int], +conf: SparkConf, private val rootHandler: ContextHandlerCollection) { - def addHandler(handler: ContextHandler): Unit = { + def addHandler(handler: ServletContextHandler): Unit = { handler.setVirtualHosts(JettyUtils.toVirtualHosts(JettyUtils.SPARK_CONNECTOR_NAME)) +JettyUtils.addFilters(Seq(handler), conf) rootHandler.addHandler(handler) if (!handler.isStarted()) { handler.start() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24506][UI] Add UI filters to tabs added after binding
Repository: spark Updated Branches: refs/heads/master 0d3714d22 -> f53818d35 [SPARK-24506][UI] Add UI filters to tabs added after binding ## What changes were proposed in this pull request? Currently, `spark.ui.filters` are not applied to the handlers added after binding the server. This means that every page which is added after starting the UI will not have the filters configured on it. This can allow unauthorized access to the pages. The PR adds the filters also to the handlers added after the UI starts. ## How was this patch tested? manual tests (without the patch, starting the thriftserver with `--conf spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter --conf spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"` you can access `http://localhost:4040/sqlserver`; with the patch, 401 is the response as for the other pages). Author: Marco Gaido Closes #21523 from mgaido91/SPARK-24506. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f53818d3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f53818d3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f53818d3 Branch: refs/heads/master Commit: f53818d35bdef5d20a2718b14a2fed4c468545c6 Parents: 0d3714d Author: Marco Gaido Authored: Tue Jun 12 16:42:44 2018 -0700 Committer: Marcelo Vanzin Committed: Tue Jun 12 16:42:44 2018 -0700 -- .../org/apache/spark/deploy/history/HistoryServer.scala | 1 - core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 8 +--- 2 files changed, 5 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f53818d3/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index a9a4d5a..066275e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -152,7 +152,6 @@ class HistoryServer( assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs") handlers.synchronized { ui.getHandlers.foreach(attachHandler) - addFilters(ui.getHandlers, conf) } } http://git-wip-us.apache.org/repos/asf/spark/blob/f53818d3/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index d6a025a..52a9551 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -263,7 +263,7 @@ private[spark] object JettyUtils extends Logging { filters.foreach { case filter : String => if (!filter.isEmpty) { - logInfo("Adding filter: " + filter) + logInfo(s"Adding filter $filter to ${handlers.map(_.getContextPath).mkString(", ")}.") val holder : FilterHolder = new FilterHolder() holder.setClassName(filter) // Get any parameters for each filter @@ -407,7 +407,7 @@ private[spark] object JettyUtils extends Logging { } pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads)) - ServerInfo(server, httpPort, securePort, collection) + ServerInfo(server, httpPort, securePort, conf, collection) } catch { case e: Exception => server.stop() @@ -507,10 +507,12 @@ private[spark] case class ServerInfo( server: Server, boundPort: Int, securePort: Option[Int], +conf: SparkConf, private val rootHandler: ContextHandlerCollection) { - def addHandler(handler: ContextHandler): Unit = { + def addHandler(handler: ServletContextHandler): Unit = { handler.setVirtualHosts(JettyUtils.toVirtualHosts(JettyUtils.SPARK_CONNECTOR_NAME)) +JettyUtils.addFilters(Seq(handler), conf) rootHandler.addHandler(handler) if (!handler.isStarted()) { handler.start() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24506][UI] Add UI filters to tabs added after binding
Repository: spark Updated Branches: refs/heads/branch-2.2 bf0b21298 -> 1f81ade0c [SPARK-24506][UI] Add UI filters to tabs added after binding Currently, `spark.ui.filters` are not applied to the handlers added after binding the server. This means that every page which is added after starting the UI will not have the filters configured on it. This can allow unauthorized access to the pages. The PR adds the filters also to the handlers added after the UI starts. manual tests (without the patch, starting the thriftserver with `--conf spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter --conf spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"` you can access `http://localhost:4040/sqlserver`; with the patch, 401 is the response as for the other pages). Author: Marco Gaido Closes #21523 from mgaido91/SPARK-24506. (cherry picked from commit f53818d35bdef5d20a2718b14a2fed4c468545c6) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f81ade0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f81ade0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f81ade0 Branch: refs/heads/branch-2.2 Commit: 1f81ade0c7e232f25b39fc6157f63ea91722c829 Parents: bf0b212 Author: Marco Gaido Authored: Tue Jun 12 16:42:44 2018 -0700 Committer: Marcelo Vanzin Committed: Tue Jun 12 16:43:53 2018 -0700 -- .../org/apache/spark/deploy/history/HistoryServer.scala | 1 - core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 8 +--- 2 files changed, 5 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1f81ade0/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index d9c8fda..967cf14 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -151,7 +151,6 @@ class HistoryServer( completed: Boolean) { assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs") ui.getHandlers.foreach(attachHandler) -addFilters(ui.getHandlers, conf) } /** Detach a reconstructed UI from this server. Only valid after bind(). */ http://git-wip-us.apache.org/repos/asf/spark/blob/1f81ade0/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 7df1de5..3e0b62d 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -250,7 +250,7 @@ private[spark] object JettyUtils extends Logging { filters.foreach { case filter : String => if (!filter.isEmpty) { - logInfo("Adding filter: " + filter) + logInfo(s"Adding filter $filter to ${handlers.map(_.getContextPath).mkString(", ")}.") val holder : FilterHolder = new FilterHolder() holder.setClassName(filter) // Get any parameters for each filter @@ -393,7 +393,7 @@ private[spark] object JettyUtils extends Logging { } pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads)) - ServerInfo(server, httpPort, securePort, collection) + ServerInfo(server, httpPort, securePort, conf, collection) } catch { case e: Exception => server.stop() @@ -492,10 +492,12 @@ private[spark] case class ServerInfo( server: Server, boundPort: Int, securePort: Option[Int], +conf: SparkConf, private val rootHandler: ContextHandlerCollection) { - def addHandler(handler: ContextHandler): Unit = { + def addHandler(handler: ServletContextHandler): Unit = { handler.setVirtualHosts(JettyUtils.toVirtualHosts(JettyUtils.SPARK_CONNECTOR_NAME)) +JettyUtils.addFilters(Seq(handler), conf) rootHandler.addHandler(handler) if (!handler.isStarted()) { handler.start() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23754][PYTHON][FOLLOWUP][BACKPORT-2.3] Move UDF stop iteration wrapping from driver to executor
Repository: spark Updated Branches: refs/heads/branch-2.3 a55de387d -> 470cacd49 [SPARK-23754][PYTHON][FOLLOWUP][BACKPORT-2.3] Move UDF stop iteration wrapping from driver to executor SPARK-23754 was fixed in #21383 by changing the UDF code to wrap the user function, but this required a hack to save its argspec. This PR reverts this change and fixes the `StopIteration` bug in the worker. The root of the problem is that when an user-supplied function raises a `StopIteration`, pyspark might stop processing data, if this function is used in a for-loop. The solution is to catch `StopIteration`s exceptions and re-raise them as `RuntimeError`s, so that the execution fails and the error is reported to the user. This is done using the `fail_on_stopiteration` wrapper, in different ways depending on where the function is used: - In RDDs, the user function is wrapped in the driver, because this function is also called in the driver itself. - In SQL UDFs, the function is wrapped in the worker, since all processing happens there. Moreover, the worker needs the signature of the user function, which is lost when wrapping it, but passing this signature to the worker requires a not so nice hack. HyukjinKwon Author: edorigatti Author: e-dorigatti Closes #21538 from e-dorigatti/branch-2.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/470cacd4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/470cacd4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/470cacd4 Branch: refs/heads/branch-2.3 Commit: 470cacd4982ca369ffd294ee37abfa1864d39967 Parents: a55de38 Author: edorigatti Authored: Wed Jun 13 09:06:06 2018 +0800 Committer: hyukjinkwon Committed: Wed Jun 13 09:06:06 2018 +0800 -- python/pyspark/sql/tests.py | 54 python/pyspark/sql/udf.py | 4 +-- python/pyspark/tests.py | 37 --- python/pyspark/util.py | 2 +- python/pyspark/worker.py| 11 +--- 5 files changed, 70 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/470cacd4/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 818ba83..aa7d8eb 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -853,22 +853,6 @@ class SQLTests(ReusedSQLTestCase): self.assertEqual(f, f_.func) self.assertEqual(return_type, f_.returnType) -def test_stopiteration_in_udf(self): -# test for SPARK-23754 -from pyspark.sql.functions import udf -from py4j.protocol import Py4JJavaError - -def foo(x): -raise StopIteration() - -with self.assertRaises(Py4JJavaError) as cm: -self.spark.range(0, 1000).withColumn('v', udf(foo)('id')).show() - -self.assertIn( -"Caught StopIteration thrown from user's code; failing the task", -cm.exception.java_exception.toString() -) - def test_validate_column_types(self): from pyspark.sql.functions import udf, to_json from pyspark.sql.column import _to_java_column @@ -3917,6 +3901,44 @@ class PandasUDFTests(ReusedSQLTestCase): def foo(k, v): return k +def test_stopiteration_in_udf(self): +from pyspark.sql.functions import udf, pandas_udf, PandasUDFType +from py4j.protocol import Py4JJavaError + +def foo(x): +raise StopIteration() + +def foofoo(x, y): +raise StopIteration() + +exc_message = "Caught StopIteration thrown from user's code; failing the task" +df = self.spark.range(0, 100) + +# plain udf (test for SPARK-23754) +self.assertRaisesRegexp( +Py4JJavaError, +exc_message, +df.withColumn('v', udf(foo)('id')).collect +) + +# pandas scalar udf +self.assertRaisesRegexp( +Py4JJavaError, +exc_message, +df.withColumn( +'v', pandas_udf(foo, 'double', PandasUDFType.SCALAR)('id') +).collect +) + +# pandas grouped map +self.assertRaisesRegexp( +Py4JJavaError, +exc_message, +df.groupBy('id').apply( +pandas_udf(foo, df.schema, PandasUDFType.GROUPED_MAP) +).collect +) + @unittest.skipIf( not _have_pandas or not _have_pyarrow, http://git-wip-us.apache.org/repos/asf/spark/blob/470cacd4/python/pyspark/sql/udf.py -- diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 7d813af..671e568 100
spark git commit: [SPARK-22239][SQL][PYTHON] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames
Repository: spark Updated Branches: refs/heads/master f53818d35 -> 9786ce66c [SPARK-22239][SQL][PYTHON] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames ## What changes were proposed in this pull request? This PR enables using a grouped aggregate pandas UDFs as window functions. The semantics is the same as using SQL aggregation function as window functions. ``` >>> from pyspark.sql.functions import pandas_udf, PandasUDFType >>> from pyspark.sql import Window >>> df = spark.createDataFrame( ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ... ("id", "v")) >>> pandas_udf("double", PandasUDFType.GROUPED_AGG) ... def mean_udf(v): ... return v.mean() >>> w = Window.partitionBy('id') >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() +---++--+ | id| v|mean_v| +---++--+ | 1| 1.0| 1.5| | 1| 2.0| 1.5| | 2| 3.0| 6.0| | 2| 5.0| 6.0| | 2|10.0| 6.0| +---++--+ ``` The scope of this PR is somewhat limited in terms of: (1) Only supports unbounded window, which acts essentially as group by. (2) Only supports aggregation functions, not "transform" like window functions (n -> n mapping) Both of these are left as future work. Especially, (1) needs careful thinking w.r.t. how to pass rolling window data to python efficiently. (2) is a bit easier but does require more changes therefore I think it's better to leave it as a separate PR. ## How was this patch tested? WindowPandasUDFTests Author: Li Jin Closes #21082 from icexelloss/SPARK-22239-window-udf. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9786ce66 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9786ce66 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9786ce66 Branch: refs/heads/master Commit: 9786ce66c52d41b1d58ddedb3a984f561fd09ff3 Parents: f53818d Author: Li Jin Authored: Wed Jun 13 09:10:52 2018 +0800 Committer: hyukjinkwon Committed: Wed Jun 13 09:10:52 2018 +0800 -- .../apache/spark/api/python/PythonRunner.scala | 2 + python/pyspark/rdd.py | 1 + python/pyspark/sql/functions.py | 34 ++- python/pyspark/sql/tests.py | 238 +++ python/pyspark/worker.py| 20 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 11 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 12 +- .../sql/catalyst/expressions/PythonUDF.scala| 6 +- .../expressions/windowExpressions.scala | 33 ++- .../sql/catalyst/optimizer/Optimizer.scala | 7 +- .../spark/sql/catalyst/planning/patterns.scala | 42 +++- .../spark/sql/execution/SparkPlanner.scala | 1 + .../spark/sql/execution/SparkStrategies.scala | 20 +- .../execution/python/ExtractPythonUDFs.scala| 2 +- .../execution/python/WindowInPandasExec.scala | 173 ++ 15 files changed, 580 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9786ce66/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 41eac10..ebabedf 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -40,6 +40,7 @@ private[spark] object PythonEvalType { val SQL_SCALAR_PANDAS_UDF = 200 val SQL_GROUPED_MAP_PANDAS_UDF = 201 val SQL_GROUPED_AGG_PANDAS_UDF = 202 + val SQL_WINDOW_AGG_PANDAS_UDF = 203 def toString(pythonEvalType: Int): String = pythonEvalType match { case NON_UDF => "NON_UDF" @@ -47,6 +48,7 @@ private[spark] object PythonEvalType { case SQL_SCALAR_PANDAS_UDF => "SQL_SCALAR_PANDAS_UDF" case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF" case SQL_GROUPED_AGG_PANDAS_UDF => "SQL_GROUPED_AGG_PANDAS_UDF" +case SQL_WINDOW_AGG_PANDAS_UDF => "SQL_WINDOW_AGG_PANDAS_UDF" } } http://git-wip-us.apache.org/repos/asf/spark/blob/9786ce66/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 14d9128..7e7e582 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -74,6 +74,7 @@ class PythonEvalType(object): SQL_SCALAR_PANDAS_UDF = 200 SQL_GROUPED_MAP_PANDAS_UDF = 201 SQL_GROUPED_AGG_PANDAS_UDF = 202 +SQL_WINDOW_AGG_PANDAS_UDF = 203 def portable_hash(x): http://git-wip-us.a
svn commit: r27413 - in /dev/spark/2.3.2-SNAPSHOT-2018_06_12_18_02-a55de38-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jun 13 01:15:30 2018 New Revision: 27413 Log: Apache Spark 2.3.2-SNAPSHOT-2018_06_12_18_02-a55de38 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r27414 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_12_20_01-9786ce6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jun 13 03:16:25 2018 New Revision: 27414 Log: Apache Spark 2.4.0-SNAPSHOT-2018_06_12_20_01-9786ce6 docs [This commit notification would consist of 1467 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be compatible with netcat again
Repository: spark Updated Branches: refs/heads/master 9786ce66c -> 3352d6fe9 [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be compatible with netcat again ## What changes were proposed in this pull request? TextSocketMicroBatchReader was no longer be compatible with netcat due to launching temporary reader for reading schema, and closing reader, and re-opening reader. While reliable socket server should be able to handle this without any issue, nc command normally can't handle multiple connections and simply exits when closing temporary reader. This patch fixes TextSocketMicroBatchReader to be compatible with netcat again, via deferring opening socket to the first call of planInputPartitions() instead of constructor. ## How was this patch tested? Added unit test which fails on current and succeeds with the patch. And also manually tested. Author: Jungtaek Lim Closes #21497 from HeartSaVioR/SPARK-24466. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3352d6fe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3352d6fe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3352d6fe Branch: refs/heads/master Commit: 3352d6fe9a1efb6dee18e40bdf584930b10d1d3e Parents: 9786ce6 Author: Jungtaek Lim Authored: Wed Jun 13 12:34:46 2018 +0800 Committer: hyukjinkwon Committed: Wed Jun 13 12:34:46 2018 +0800 -- .../execution/streaming/sources/socket.scala| 7 +- .../sources/TextSocketStreamSuite.scala | 119 --- 2 files changed, 85 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3352d6fe/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala index 8240e06..91e3b71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala @@ -22,6 +22,7 @@ import java.net.Socket import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.{Calendar, List => JList, Locale, Optional} +import java.util.concurrent.atomic.AtomicBoolean import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ @@ -76,7 +77,7 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR @GuardedBy("this") private var lastOffsetCommitted: LongOffset = LongOffset(-1L) - initialize() + private val initialized: AtomicBoolean = new AtomicBoolean(false) /** This method is only used for unit test */ private[sources] def getCurrentOffset(): LongOffset = synchronized { @@ -149,6 +150,10 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR // Internal buffer only holds the batches after lastOffsetCommitted val rawList = synchronized { + if (initialized.compareAndSet(false, true)) { +initialize() + } + val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 batches.slice(sliceStart, sliceEnd) http://git-wip-us.apache.org/repos/asf/spark/blob/3352d6fe/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala index a15a980..52e8386 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.sql.execution.streaming.sources -import java.io.IOException -import java.net.InetSocketAddress +import java.net.{InetSocketAddress, SocketException} import java.nio.ByteBuffer import java.nio.channels.ServerSocketChannel import java.sql.Timestamp @@ -33,9 +32,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} -
spark git commit: [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider
Repository: spark Updated Branches: refs/heads/master 3352d6fe9 -> 4c388bccf [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider ## What changes were proposed in this pull request? This patch measures and logs elapsed time for each operation which communicate with file system (mostly remote HDFS in production) in HDFSBackedStateStoreProvider to help investigating any latency issue. ## How was this patch tested? Manually tested. Author: Jungtaek Lim Closes #21506 from HeartSaVioR/SPARK-24485. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c388bcc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c388bcc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c388bcc Branch: refs/heads/master Commit: 4c388bccf1bcac8f833fd9214096dd164c3ea065 Parents: 3352d6f Author: Jungtaek Lim Authored: Wed Jun 13 12:36:20 2018 +0800 Committer: hyukjinkwon Committed: Wed Jun 13 12:36:20 2018 +0800 -- .../scala/org/apache/spark/util/Utils.scala | 11 ++- .../state/HDFSBackedStateStoreProvider.scala| 83 .../execution/streaming/statefulOperators.scala | 9 +-- 3 files changed, 62 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4c388bcc/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7428db2..c139db4 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -31,6 +31,7 @@ import java.nio.file.Files import java.security.SecureRandom import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ +import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.concurrent.atomic.AtomicBoolean import java.util.zip.GZIPInputStream @@ -434,7 +435,7 @@ private[spark] object Utils extends Logging { new URI("file:///" + rawFileName).getPath.substring(1) } -/** + /** * Download a file or directory to target directory. Supports fetching the file in a variety of * ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based * on the URL parameter. Fetching directories is only supported from Hadoop-compatible @@ -507,6 +508,14 @@ private[spark] object Utils extends Logging { targetFile } + /** Records the duration of running `body`. */ + def timeTakenMs[T](body: => T): (T, Long) = { +val startTime = System.nanoTime() +val result = body +val endTime = System.nanoTime() +(result, math.max(NANOSECONDS.toMillis(endTime - startTime), 0)) + } + /** * Download `in` to `tempFile`, then move it to `destFile`. * http://git-wip-us.apache.org/repos/asf/spark/blob/4c388bcc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index df722b9..118c82a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -18,12 +18,10 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ -import java.nio.channels.ClosedChannelException import java.util.Locale import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.util.Random import scala.util.control.NonFatal import com.google.common.io.ByteStreams @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit if (loadedCurrentVersionMap.isDefined) { return loadedCurrentVersionMap.get } -val snapshotCurrentVersionMap = readSnapshotFile(version) -if (snapshotCurrentVersionMap.isDefined) { - synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } - return snapshotCurrentVersionMap.get -} -// Find the most recent map before this version that we can. -// [SPARK-22305] This must be done iteratively to avoid stack overflow. -var lastAvailableVersion = version -var lastAvailableMap: Option[MapType] = None -while (lastAvailableMap.isEmpty) { - lastAvailableVersion -= 1 +logWarning(s"The state for version $version doesn't exist in loadedMaps. " + + "Readin
svn commit: r27415 - in /dev/spark/2.3.2-SNAPSHOT-2018_06_12_22_01-470cacd-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jun 13 05:16:07 2018 New Revision: 27415 Log: Apache Spark 2.3.2-SNAPSHOT-2018_06_12_22_01-470cacd docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org