[GitHub] spark issue #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrin...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20416 **[Test build #86727 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86727/testReport)** for PR 20416 at commit [`f2026eb`](https://github.com/apache/spark/commit/f2026ebb3b0a0bf00f0f9229fceae1e70b3b6398). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20369: [SPARK-23196] Unify continuous and microbatch V2 sinks
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20369 **[Test build #86735 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86735/testReport)** for PR 20369 at commit [`d311d56`](https://github.com/apache/spark/commit/d311d5639b3af9123e0c6dbe38468f0172e06712). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20396: [SPARK-23217][ML] Add cosine distance measure to Cluster...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20396 **[Test build #86731 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86731/testReport)** for PR 20396 at commit [`8a68f75`](https://github.com/apache/spark/commit/8a68f758a7a41f6c2a9a58f54a982745665be6a6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20409: [SPARK-23233][PYTHON] Reset the cache in asNondeterminis...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20409 **[Test build #86729 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86729/testReport)** for PR 20409 at commit [`b23ff02`](https://github.com/apache/spark/commit/b23ff02f543ecc92db574b808ea00f9ff7d236f8). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20375: [SPARK-23199][SQL]improved Removes repetition from group...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20375 **[Test build #86732 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86732/testReport)** for PR 20375 at commit [`caf581f`](https://github.com/apache/spark/commit/caf581f7f171912af4cebbc3a96887c7bb4a87e5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrin...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20416 **[Test build #86727 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86727/testReport)** for PR 20416 at commit [`f2026eb`](https://github.com/apache/spark/commit/f2026ebb3b0a0bf00f0f9229fceae1e70b3b6398). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20146 **[Test build #86734 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86734/testReport)** for PR 20146 at commit [`b884fb5`](https://github.com/apache/spark/commit/b884fb5c0ce1e627390d08d8425721ea8e4d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20403: [SPARK-23238][PYTHON] Externalize SQLConf spark.sql.exec...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20403 **[Test build #86730 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86730/testReport)** for PR 20403 at commit [`1f4d288`](https://github.com/apache/spark/commit/1f4d2884ba5b56e06427ce3d91cb6ac5f8f2b7b6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20369: [SPARK-23196] Unify continuous and microbatch V2 sinks
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20369 **[Test build #86733 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86733/testReport)** for PR 20369 at commit [`d311d56`](https://github.com/apache/spark/commit/d311d5639b3af9123e0c6dbe38468f0172e06712). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20414 **[Test build #86728 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86728/testReport)** for PR 20414 at commit [`6910ed6`](https://github.com/apache/spark/commit/6910ed62c272bedfa251cab589bb52bed36be3ed). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20146 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20415 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20369: [SPARK-23196] Unify continuous and microbatch V2 sinks
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/20369 Retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20146 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/308/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20396: [SPARK-23217][ML] Add cosine distance measure to Cluster...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20396 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20396: [SPARK-23217][ML] Add cosine distance measure to Cluster...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20396 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/307/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20403: [SPARK-23238][PYTHON] Externalize SQLConf spark.sql.exec...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20403 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20409: [SPARK-23233][PYTHON] Reset the cache in asNondeterminis...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20409 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/305/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20403: [SPARK-23238][PYTHON] Externalize SQLConf spark.sql.exec...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20403 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/306/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20409: [SPARK-23233][PYTHON] Reset the cache in asNondeterminis...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20409 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20414 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20414 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/304/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20416 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20415 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20416 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/303/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20403: [SPARK-23238][PYTHON] Externalize SQLConf spark.sql.exec...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20403 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20403: [SPARK-23238][PYTHON] Externalize SQLConf spark.sql.exec...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20403 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVectorizer
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/20367 LGTM, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20375: [SPARK-23199][SQL]improved Removes repetition from group...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20375 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user ymazari commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164275764 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,48 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val countingRequired = $(minDF) < 1.0 || $(maxDF) < 1.0 +val maybeInputSize = if (countingRequired) { + Some(input.cache().count()) +} else { + None +} val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { - $(minDF) * input.cache().count() + $(minDF) * maybeInputSize.get } -val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) => +val maxDf = if ($(maxDF) >= 1.0) { + $(maxDF) +} else { + $(maxDF) * maybeInputSize.get +} +require(maxDf >= minDf, "maxDF must be >= minDF.") +val allWordCounts = input.flatMap { case (tokens) => val wc = new OpenHashMap[String, Long] tokens.foreach { w => wc.changeValue(w, 1L, _ + 1L) } wc.map { case (word, count) => (word, (count, 1)) } }.reduceByKey { case ((wc1, df1), (wc2, df2)) => (wc1 + wc2, df1 + df2) -}.filter { case (word, (wc, df)) => - df >= minDf -}.map { case (word, (count, dfCount)) => - (word, count) -}.cache() +} + +val filteringRequired = isSet(minDF) || isSet(maxDF) --- End diff -- Making a variable here for the sake of clarity. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19575: [SPARK-22221][DOCS] Adding User Documentation for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19575#discussion_r164275737 --- Diff: docs/sql-programming-guide.md --- @@ -1640,6 +1640,133 @@ Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` a You may run `./bin/spark-sql --help` for a complete list of all available options. +# PySpark Usage Guide for Pandas with Apache Arrow + +## Apache Arrow in Spark + +Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer +data between JVM and Python processes. This currently is most beneficial to Python users that +work with Pandas/NumPy data. Its usage is not automatic and might require some minor +changes to configuration or code to take full advantage and ensure compatibility. This guide will +give a high-level description of how to use Arrow in Spark and highlight any differences when +working with Arrow-enabled data. + +### Ensure PyArrow Installed + +If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the +SQL module with the command `pip install pyspark[sql]`. Otherwise, you must ensure that PyArrow +is installed and available on all cluster nodes. The current supported version is 0.8.0. +You can install using pip or conda from the conda-forge channel. See PyArrow +[installation](https://arrow.apache.org/docs/python/install.html) for details. + +## Enabling for Conversion to/from Pandas + +Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call +`toPandas()` and when creating a Spark DataFrame from Pandas with `createDataFrame(pandas_df)`. +To use Arrow when executing these calls, users need to first set the Spark configuration +'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default. + + + +{% include_example dataframe_with_arrow python/sql/arrow.py %} + + + +Using the above optimizations with Arrow will produce the same results as when Arrow is not +enabled. Note that even with Arrow, `toPandas()` results in the collection of all records in the +DataFrame to the driver program and should be done on a small subset of the data. Not all Spark +data types are currently supported and an error can be raised if a column has an unsupported type, +see [Supported Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`, +Spark will fall back to create the DataFrame without Arrow. + +## Pandas UDFs (a.k.a. Vectorized UDFs) + +Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and +Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a decorator +or to wrap the function, no additional configuration is required. Currently, there are two types of +Pandas UDF: Scalar and Group Map. + +### Scalar + +Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions such +as `select` and `withColumn`. The Python function should take `pandas.Series` as inputs and return +a `pandas.Series` of the same length. Internally, Spark will execute a Pandas UDF by splitting +columns into batches and calling the function for each batch as a subset of the data, then +concatenating the results together. + +The following example shows how to create a scalar Pandas UDF that computes the product of 2 columns. + + + +{% include_example scalar_pandas_udf python/sql/arrow.py %} + + + +### Group Map +Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern. --- End diff -- `Group Map` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user ymazari commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164275714 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) +val maybeInputSize = if (filteringRequired) { + Some(input.cache().count()) +} else { + None +} val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { - $(minDF) * input.cache().count() + $(minDF) * maybeInputSize.getOrElse(1L) } -val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) => +val maxDf = if ($(maxDF) >= 1.0) { + $(maxDF) +} else { + $(maxDF) * maybeInputSize.getOrElse(1L) +} +require(maxDf >= minDf, "maxDF must be >= minDF.") +val allWordCounts = input.flatMap { case (tokens) => val wc = new OpenHashMap[String, Long] tokens.foreach { w => wc.changeValue(w, 1L, _ + 1L) } wc.map { case (word, count) => (word, (count, 1)) } }.reduceByKey { case ((wc1, df1), (wc2, df2)) => (wc1 + wc2, df1 + df2) -}.filter { case (word, (wc, df)) => - df >= minDf -}.map { case (word, (count, dfCount)) => - (word, count) -}.cache() +} + +val maybeFilteredWordCounts = if (filteringRequired) { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user ymazari commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164275722 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) +val maybeInputSize = if (filteringRequired) { + Some(input.cache().count()) +} else { + None +} val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { - $(minDF) * input.cache().count() + $(minDF) * maybeInputSize.getOrElse(1L) } -val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) => +val maxDf = if ($(maxDF) >= 1.0) { + $(maxDF) +} else { + $(maxDF) * maybeInputSize.getOrElse(1L) +} +require(maxDf >= minDf, "maxDF must be >= minDF.") +val allWordCounts = input.flatMap { case (tokens) => val wc = new OpenHashMap[String, Long] tokens.foreach { w => wc.changeValue(w, 1L, _ + 1L) } wc.map { case (word, count) => (word, (count, 1)) } }.reduceByKey { case ((wc1, df1), (wc2, df2)) => (wc1 + wc2, df1 + df2) -}.filter { case (word, (wc, df)) => - df >= minDf -}.map { case (word, (count, dfCount)) => - (word, count) -}.cache() +} + +val maybeFilteredWordCounts = if (filteringRequired) { + allWordCounts.filter { case (word, (wc, df)) => (df >= minDf) && (df <= maxDf) } +} else { + allWordCounts +} + +val wordCounts = maybeFilteredWordCounts + .map { case (word, (count, dfCount)) => (word, count) } --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user ymazari commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164275721 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) +val maybeInputSize = if (filteringRequired) { + Some(input.cache().count()) +} else { + None +} val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { - $(minDF) * input.cache().count() + $(minDF) * maybeInputSize.getOrElse(1L) } -val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) => +val maxDf = if ($(maxDF) >= 1.0) { + $(maxDF) +} else { + $(maxDF) * maybeInputSize.getOrElse(1L) +} +require(maxDf >= minDf, "maxDF must be >= minDF.") +val allWordCounts = input.flatMap { case (tokens) => val wc = new OpenHashMap[String, Long] tokens.foreach { w => wc.changeValue(w, 1L, _ + 1L) } wc.map { case (word, count) => (word, (count, 1)) } }.reduceByKey { case ((wc1, df1), (wc2, df2)) => (wc1 + wc2, df1 + df2) -}.filter { case (word, (wc, df)) => - df >= minDf -}.map { case (word, (count, dfCount)) => - (word, count) -}.cache() +} + +val maybeFilteredWordCounts = if (filteringRequired) { + allWordCounts.filter { case (word, (wc, df)) => (df >= minDf) && (df <= maxDf) } --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user ymazari commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164275712 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) +val maybeInputSize = if (filteringRequired) { + Some(input.cache().count()) +} else { + None +} val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { - $(minDF) * input.cache().count() + $(minDF) * maybeInputSize.getOrElse(1L) } -val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) => +val maxDf = if ($(maxDF) >= 1.0) { + $(maxDF) +} else { + $(maxDF) * maybeInputSize.getOrElse(1L) --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user ymazari commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164275697 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user ymazari commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164275706 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) +val maybeInputSize = if (filteringRequired) { + Some(input.cache().count()) +} else { + None +} val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { - $(minDF) * input.cache().count() + $(minDF) * maybeInputSize.getOrElse(1L) --- End diff -- Right. I guess I was overdoing it here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20372: [SPARK-23249] Improved block merging logic for partition...
Github user glentakahashi commented on the issue: https://github.com/apache/spark/pull/20372 Created https://issues.apache.org/jira/browse/SPARK-23249 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20396: [SPARK-23217][ML] Add cosine distance measure to Cluster...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20396 **[Test build #4080 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4080/testReport)** for PR 20396 at commit [`8a68f75`](https://github.com/apache/spark/commit/8a68f758a7a41f6c2a9a58f54a982745665be6a6). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164273743 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) --- End diff -- yes, this is the right condition to choose whether to `filter` or not, but the right one to choose whether to do the count or not is the one you just stated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20396: [SPARK-23217][ML] Add cosine distance measure to Cluster...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20396 **[Test build #4080 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4080/testReport)** for PR 20396 at commit [`8a68f75`](https://github.com/apache/spark/commit/8a68f758a7a41f6c2a9a58f54a982745665be6a6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164273656 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) --- End diff -- I imagine you just want the same conditions that are used below, like `$(minDF) >= 1.0` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrin...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20416 cc @yanboliang, @ueshin, @viirya and @MLnick, could you guys check if it makes sense to you when you are available? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module d...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/20416 [SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrings to the top in PySpark examples ## What changes were proposed in this pull request? This PR proposes to relocate the docstrings in modules of examples to the top. Seems these are mistakes. So, for example, the below codes ```python >>> help(aft_survival_regression) ``` shows the module docstrings for examples as below: **Before** ``` Help on module aft_survival_regression: NAME aft_survival_regression ... DESCRIPTION # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # #http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ... (END) ``` **After** ``` Help on module aft_survival_regression: NAME aft_survival_regression ... DESCRIPTION An example demonstrating aft survival regression. Run with: bin/spark-submit examples/src/main/python/ml/aft_survival_regression.py (END) ``` ## How was this patch tested? Manually checked. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark module-docstring-example Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20416.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20416 commit f2026ebb3b0a0bf00f0f9229fceae1e70b3b6398 Author: hyukjinkwonDate: 2018-01-27T12:05:26Z Relocate module docstrings to the top in PySpark examples --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20404: [SPARK-23228][PYSPARK] Add Python Created jsparkS...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20404#discussion_r164269656 --- Diff: python/pyspark/sql/session.py --- @@ -225,6 +225,7 @@ def __init__(self, sparkContext, jsparkSession=None): if SparkSession._instantiatedSession is None \ or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self + self._jvm.org.apache.spark.sql.SparkSession.setDefaultSession(self._jsparkSession) --- End diff -- I think it would overwrite. Can we try something like this? ```diff diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 6c84023c43f..0bdfc88153f 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -213,7 +213,10 @@ class SparkSession(object): self._jsc = self._sc._jsc self._jvm = self._sc._jvm if jsparkSession is None: -jsparkSession = self._jvm.SparkSession(self._jsc.sc()) +if self._jvm.SparkSession.getDefaultSession().isEmpty(): +jsparkSession = self._jvm.SparkSession.getDefaultSession().get() +else: +jsparkSession = self._jvm.SparkSession(self._jsc.sc()) self._jsparkSession = jsparkSession self._jwrapped = self._jsparkSession.sqlContext() self._wrapped = SQLContext(self._sc, self, self._jwrapped) @@ -225,6 +228,8 @@ class SparkSession(object): if SparkSession._instantiatedSession is None \ or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self +if self._jvm.SparkSession.getDefaultSession().isEmpty(): + self._jvm.SparkSession.setDefaultSession(self._jsparkSession) def _repr_html_(self): return """ @@ -759,6 +764,7 @@ class SparkSession(object): """Stop the underlying :class:`SparkContext`. """ self._sc.stop() +self._jvm.org.apache.spark.sql.SparkSession.clearDefaultSession() SparkSession._instantiatedSession = None ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20383: [SPARK-23200] Reset Kubernetes-specific config on Checkp...
Github user ssaavedra commented on the issue: https://github.com/apache/spark/pull/20383 spark-integration was created much later. I originally opened this as https://github.com/apache-spark-on-k8s/spark/pull/516 last September. However, the integration tests repo exists since December 1st. It would be interesting to have a test, but we'd need to set up checkpointing. In my test setup I used AWS S3 for the checkpoint storage but in order not to depend on that, we'd have to deploy a distributed filesystem supported by Hadoop in minikube. Although configuring it should be an orthogonal problem to this test scenario, but we may run into other integration issues. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20409: [SPARK-23233][PYTHON] Reset the cache in asNondeterminis...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20409 I was hesitant because it accesses to a lot of internal instances via JVM as I wrote in the PR description. Let me just use it as a test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20410: [SPARK-23234][ML][PYSPARK] Remove setting default...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20410#discussion_r164267635 --- Diff: python/pyspark/ml/wrapper.py --- @@ -118,10 +118,9 @@ def _transfer_params_to_java(self): """ Transforms the embedded params to the companion Java object. """ -paramMap = self.extractParamMap() for param in self.params: -if param in paramMap: -pair = self._make_java_param_pair(param, paramMap[param]) +if param in self._paramMap: +pair = self._make_java_param_pair(param, self._paramMap[param]) --- End diff -- Thanks for your comment. There is only one problem: you _can't_ transfer the default values to Scala without using `set`. Indeed, `setDefault` is `protected`, so it can't be called by Python. Moreover, we already have test cases which ensures that the defaults in Python and Scala are the same. And since the user can't change a default, we are on the safe side. As I said before, I think that a good next step would be to read the defaults from Scala in Python and this will make us sure that they will be always consistent. Thanks for the suggestion, but I don't really agree that using `getOrDefault` would make the intent more clear: on the opposite, I think it might be confusing. I can use `isSet` as you suggested, instead, or I can add a comment, what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20405: [SPARK-23229][SQL] Dataset.hint should use planWi...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20405#discussion_r164267184 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1216,7 +1216,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan { -UnresolvedHint(name, parameters, logicalPlan) +UnresolvedHint(name, parameters, planWithBarrier) --- End diff -- @jaceklaskowski Because the logical plan is wrapped in analysis barrier, `ResolveBroadcastHints` can't traverse down it to reach the `UnresolvedRelation` at https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala#L60-L61. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164267076 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) +val maybeInputSize = if (filteringRequired) { + Some(input.cache().count()) +} else { + None +} val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { - $(minDF) * input.cache().count() + $(minDF) * maybeInputSize.getOrElse(1L) --- End diff -- I think `get` is enough...when we are here it must be set --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164267079 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) +val maybeInputSize = if (filteringRequired) { + Some(input.cache().count()) +} else { + None +} val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { - $(minDF) * input.cache().count() + $(minDF) * maybeInputSize.getOrElse(1L) } -val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) => +val maxDf = if ($(maxDF) >= 1.0) { + $(maxDF) +} else { + $(maxDF) * maybeInputSize.getOrElse(1L) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164267103 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) +val maybeInputSize = if (filteringRequired) { + Some(input.cache().count()) +} else { + None +} val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { - $(minDF) * input.cache().count() + $(minDF) * maybeInputSize.getOrElse(1L) } -val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) => +val maxDf = if ($(maxDF) >= 1.0) { + $(maxDF) +} else { + $(maxDF) * maybeInputSize.getOrElse(1L) +} +require(maxDf >= minDf, "maxDF must be >= minDF.") +val allWordCounts = input.flatMap { case (tokens) => val wc = new OpenHashMap[String, Long] tokens.foreach { w => wc.changeValue(w, 1L, _ + 1L) } wc.map { case (word, count) => (word, (count, 1)) } }.reduceByKey { case ((wc1, df1), (wc2, df2)) => (wc1 + wc2, df1 + df2) -}.filter { case (word, (wc, df)) => - df >= minDf -}.map { case (word, (count, dfCount)) => - (word, count) -}.cache() +} + +val maybeFilteredWordCounts = if (filteringRequired) { + allWordCounts.filter { case (word, (wc, df)) => (df >= minDf) && (df <= maxDf) } +} else { + allWordCounts +} + +val wordCounts = maybeFilteredWordCounts + .map { case (word, (count, dfCount)) => (word, count) } --- End diff -- nit: `case (word, (count, _))` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164267128 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) +val maybeInputSize = if (filteringRequired) { + Some(input.cache().count()) +} else { + None +} val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { - $(minDF) * input.cache().count() + $(minDF) * maybeInputSize.getOrElse(1L) } -val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) => +val maxDf = if ($(maxDF) >= 1.0) { + $(maxDF) +} else { + $(maxDF) * maybeInputSize.getOrElse(1L) +} +require(maxDf >= minDf, "maxDF must be >= minDF.") +val allWordCounts = input.flatMap { case (tokens) => val wc = new OpenHashMap[String, Long] tokens.foreach { w => wc.changeValue(w, 1L, _ + 1L) } wc.map { case (word, count) => (word, (count, 1)) } }.reduceByKey { case ((wc1, df1), (wc2, df2)) => (wc1 + wc2, df1 + df2) -}.filter { case (word, (wc, df)) => - df >= minDf -}.map { case (word, (count, dfCount)) => - (word, count) -}.cache() +} + +val maybeFilteredWordCounts = if (filteringRequired) { --- End diff -- after addressing my previous comment this would need to be changed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164267057 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) --- End diff -- this would cause a count also when it is not necessary, ie. when `minDF`/`maxDF` are set not as a fraction but as an integer --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20367#discussion_r164267118 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala --- @@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String) transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) +val filteringRequired = isSet(minDF) || isSet(maxDF) +val maybeInputSize = if (filteringRequired) { + Some(input.cache().count()) +} else { + None +} val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { - $(minDF) * input.cache().count() + $(minDF) * maybeInputSize.getOrElse(1L) } -val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) => +val maxDf = if ($(maxDF) >= 1.0) { + $(maxDF) +} else { + $(maxDF) * maybeInputSize.getOrElse(1L) +} +require(maxDf >= minDf, "maxDF must be >= minDF.") +val allWordCounts = input.flatMap { case (tokens) => val wc = new OpenHashMap[String, Long] tokens.foreach { w => wc.changeValue(w, 1L, _ + 1L) } wc.map { case (word, count) => (word, (count, 1)) } }.reduceByKey { case ((wc1, df1), (wc2, df2)) => (wc1 + wc2, df1 + df2) -}.filter { case (word, (wc, df)) => - df >= minDf -}.map { case (word, (count, dfCount)) => - (word, count) -}.cache() +} + +val maybeFilteredWordCounts = if (filteringRequired) { + allWordCounts.filter { case (word, (wc, df)) => (df >= minDf) && (df <= maxDf) } --- End diff -- nit: `case (_, (_, dfCount))` and I still would prefer to avoid the unnecessary parenthesis --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20405: [SPARK-23229][SQL] Dataset.hint should use planWi...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20405#discussion_r164267100 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1216,7 +1216,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan { -UnresolvedHint(name, parameters, logicalPlan) +UnresolvedHint(name, parameters, planWithBarrier) --- End diff -- I thought that that's what `ResolveBroadcastHints` does --> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala#L93-L101, doesn't it? I'm going to write a test case for it to confirm (and that's what I was asking for in the email to dev@spark the other day). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org