[GitHub] spark issue #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrin...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20416
  
**[Test build #86727 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86727/testReport)**
 for PR 20416 at commit 
[`f2026eb`](https://github.com/apache/spark/commit/f2026ebb3b0a0bf00f0f9229fceae1e70b3b6398).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20369: [SPARK-23196] Unify continuous and microbatch V2 sinks

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20369
  
**[Test build #86735 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86735/testReport)**
 for PR 20369 at commit 
[`d311d56`](https://github.com/apache/spark/commit/d311d5639b3af9123e0c6dbe38468f0172e06712).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20396: [SPARK-23217][ML] Add cosine distance measure to Cluster...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20396
  
**[Test build #86731 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86731/testReport)**
 for PR 20396 at commit 
[`8a68f75`](https://github.com/apache/spark/commit/8a68f758a7a41f6c2a9a58f54a982745665be6a6).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20409: [SPARK-23233][PYTHON] Reset the cache in asNondeterminis...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20409
  
**[Test build #86729 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86729/testReport)**
 for PR 20409 at commit 
[`b23ff02`](https://github.com/apache/spark/commit/b23ff02f543ecc92db574b808ea00f9ff7d236f8).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20375: [SPARK-23199][SQL]improved Removes repetition from group...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20375
  
**[Test build #86732 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86732/testReport)**
 for PR 20375 at commit 
[`caf581f`](https://github.com/apache/spark/commit/caf581f7f171912af4cebbc3a96887c7bb4a87e5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrin...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20416
  
**[Test build #86727 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86727/testReport)**
 for PR 20416 at commit 
[`f2026eb`](https://github.com/apache/spark/commit/f2026ebb3b0a0bf00f0f9229fceae1e70b3b6398).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20146
  
**[Test build #86734 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86734/testReport)**
 for PR 20146 at commit 
[`b884fb5`](https://github.com/apache/spark/commit/b884fb5c0ce1e627390d08d8425721ea8e4d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20403: [SPARK-23238][PYTHON] Externalize SQLConf spark.sql.exec...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20403
  
**[Test build #86730 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86730/testReport)**
 for PR 20403 at commit 
[`1f4d288`](https://github.com/apache/spark/commit/1f4d2884ba5b56e06427ce3d91cb6ac5f8f2b7b6).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20369: [SPARK-23196] Unify continuous and microbatch V2 sinks

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20369
  
**[Test build #86733 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86733/testReport)**
 for PR 20369 at commit 
[`d311d56`](https://github.com/apache/spark/commit/d311d5639b3af9123e0c6dbe38468f0172e06712).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20414
  
**[Test build #86728 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86728/testReport)**
 for PR 20414 at commit 
[`6910ed6`](https://github.com/apache/spark/commit/6910ed62c272bedfa251cab589bb52bed36be3ed).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20146
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20415
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20369: [SPARK-23196] Unify continuous and microbatch V2 sinks

2018-01-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/20369
  
Retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20146
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/308/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20396: [SPARK-23217][ML] Add cosine distance measure to Cluster...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20396
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20396: [SPARK-23217][ML] Add cosine distance measure to Cluster...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20396
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/307/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20403: [SPARK-23238][PYTHON] Externalize SQLConf spark.sql.exec...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20403
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20409: [SPARK-23233][PYTHON] Reset the cache in asNondeterminis...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20409
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/305/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20403: [SPARK-23238][PYTHON] Externalize SQLConf spark.sql.exec...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20403
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/306/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20409: [SPARK-23233][PYTHON] Reset the cache in asNondeterminis...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20409
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20414
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20414
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/304/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrin...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20416
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20415
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrin...

2018-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20416
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/303/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20403: [SPARK-23238][PYTHON] Externalize SQLConf spark.sql.exec...

2018-01-27 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20403
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20403: [SPARK-23238][PYTHON] Externalize SQLConf spark.sql.exec...

2018-01-27 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20403
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVectorizer

2018-01-27 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/20367
  
LGTM, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20375: [SPARK-23199][SQL]improved Removes repetition from group...

2018-01-27 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20375
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread ymazari
Github user ymazari commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164275764
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,48 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val countingRequired = $(minDF) < 1.0 || $(maxDF) < 1.0
+val maybeInputSize = if (countingRequired) {
+  Some(input.cache().count())
+} else {
+  None
+}
 val minDf = if ($(minDF) >= 1.0) {
   $(minDF)
 } else {
-  $(minDF) * input.cache().count()
+  $(minDF) * maybeInputSize.get
 }
-val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) =>
+val maxDf = if ($(maxDF) >= 1.0) {
+  $(maxDF)
+} else {
+  $(maxDF) * maybeInputSize.get
+}
+require(maxDf >= minDf, "maxDF must be >= minDF.")
+val allWordCounts = input.flatMap { case (tokens) =>
   val wc = new OpenHashMap[String, Long]
   tokens.foreach { w =>
 wc.changeValue(w, 1L, _ + 1L)
   }
   wc.map { case (word, count) => (word, (count, 1)) }
 }.reduceByKey { case ((wc1, df1), (wc2, df2)) =>
   (wc1 + wc2, df1 + df2)
-}.filter { case (word, (wc, df)) =>
-  df >= minDf
-}.map { case (word, (count, dfCount)) =>
-  (word, count)
-}.cache()
+}
+
+val filteringRequired = isSet(minDF) || isSet(maxDF)
--- End diff --

Making a variable here for the sake of clarity.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19575: [SPARK-22221][DOCS] Adding User Documentation for...

2018-01-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19575#discussion_r164275737
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1640,6 +1640,133 @@ Configuration of Hive is done by placing your 
`hive-site.xml`, `core-site.xml` a
 You may run `./bin/spark-sql --help` for a complete list of all available
 options.
 
+# PySpark Usage Guide for Pandas with Apache Arrow
+
+## Apache Arrow in Spark
+
+Apache Arrow is an in-memory columnar data format that is used in Spark to 
efficiently transfer
+data between JVM and Python processes. This currently is most beneficial 
to Python users that
+work with Pandas/NumPy data. Its usage is not automatic and might require 
some minor
+changes to configuration or code to take full advantage and ensure 
compatibility. This guide will
+give a high-level description of how to use Arrow in Spark and highlight 
any differences when
+working with Arrow-enabled data.
+
+### Ensure PyArrow Installed
+
+If you install PySpark using pip, then PyArrow can be brought in as an 
extra dependency of the
+SQL module with the command `pip install pyspark[sql]`. Otherwise, you 
must ensure that PyArrow
+is installed and available on all cluster nodes. The current supported 
version is 0.8.0.
+You can install using pip or conda from the conda-forge channel. See 
PyArrow
+[installation](https://arrow.apache.org/docs/python/install.html) for 
details.
+
+## Enabling for Conversion to/from Pandas
+
+Arrow is available as an optimization when converting a Spark DataFrame to 
Pandas using the call
+`toPandas()` and when creating a Spark DataFrame from Pandas with 
`createDataFrame(pandas_df)`.
+To use Arrow when executing these calls, users need to first set the Spark 
configuration
+'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default.
+
+
+
+{% include_example dataframe_with_arrow python/sql/arrow.py %}
+
+
+
+Using the above optimizations with Arrow will produce the same results as 
when Arrow is not
+enabled. Note that even with Arrow, `toPandas()` results in the collection 
of all records in the
+DataFrame to the driver program and should be done on a small subset of 
the data. Not all Spark
+data types are currently supported and an error can be raised if a column 
has an unsupported type,
+see [Supported Types](#supported-sql-arrow-types). If an error occurs 
during `createDataFrame()`,
+Spark will fall back to create the DataFrame without Arrow.
+
+## Pandas UDFs (a.k.a. Vectorized UDFs)
+
+Pandas UDFs are user defined functions that are executed by Spark using 
Arrow to transfer data and
+Pandas to work with the data. A Pandas UDF is defined using the keyword 
`pandas_udf` as a decorator
+or to wrap the function, no additional configuration is required. 
Currently, there are two types of
+Pandas UDF: Scalar and Group Map.
+
+### Scalar
+
+Scalar Pandas UDFs are used for vectorizing scalar operations. They can be 
used with functions such
+as `select` and `withColumn`. The Python function should take 
`pandas.Series` as inputs and return
+a `pandas.Series` of the same length. Internally, Spark will execute a 
Pandas UDF by splitting
+columns into batches and calling the function for each batch as a subset 
of the data, then
+concatenating the results together.
+
+The following example shows how to create a scalar Pandas UDF that 
computes the product of 2 columns.
+
+
+
+{% include_example scalar_pandas_udf python/sql/arrow.py %}
+
+
+
+### Group Map
+Group map Pandas UDFs are used with `groupBy().apply()` which implements 
the "split-apply-combine" pattern.
--- End diff --

`Group Map`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread ymazari
Github user ymazari commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164275714
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
+val maybeInputSize = if (filteringRequired) {
+  Some(input.cache().count())
+} else {
+  None
+}
 val minDf = if ($(minDF) >= 1.0) {
   $(minDF)
 } else {
-  $(minDF) * input.cache().count()
+  $(minDF) * maybeInputSize.getOrElse(1L)
 }
-val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) =>
+val maxDf = if ($(maxDF) >= 1.0) {
+  $(maxDF)
+} else {
+  $(maxDF) * maybeInputSize.getOrElse(1L)
+}
+require(maxDf >= minDf, "maxDF must be >= minDF.")
+val allWordCounts = input.flatMap { case (tokens) =>
   val wc = new OpenHashMap[String, Long]
   tokens.foreach { w =>
 wc.changeValue(w, 1L, _ + 1L)
   }
   wc.map { case (word, count) => (word, (count, 1)) }
 }.reduceByKey { case ((wc1, df1), (wc2, df2)) =>
   (wc1 + wc2, df1 + df2)
-}.filter { case (word, (wc, df)) =>
-  df >= minDf
-}.map { case (word, (count, dfCount)) =>
-  (word, count)
-}.cache()
+}
+
+val maybeFilteredWordCounts = if (filteringRequired) {
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread ymazari
Github user ymazari commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164275722
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
+val maybeInputSize = if (filteringRequired) {
+  Some(input.cache().count())
+} else {
+  None
+}
 val minDf = if ($(minDF) >= 1.0) {
   $(minDF)
 } else {
-  $(minDF) * input.cache().count()
+  $(minDF) * maybeInputSize.getOrElse(1L)
 }
-val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) =>
+val maxDf = if ($(maxDF) >= 1.0) {
+  $(maxDF)
+} else {
+  $(maxDF) * maybeInputSize.getOrElse(1L)
+}
+require(maxDf >= minDf, "maxDF must be >= minDF.")
+val allWordCounts = input.flatMap { case (tokens) =>
   val wc = new OpenHashMap[String, Long]
   tokens.foreach { w =>
 wc.changeValue(w, 1L, _ + 1L)
   }
   wc.map { case (word, count) => (word, (count, 1)) }
 }.reduceByKey { case ((wc1, df1), (wc2, df2)) =>
   (wc1 + wc2, df1 + df2)
-}.filter { case (word, (wc, df)) =>
-  df >= minDf
-}.map { case (word, (count, dfCount)) =>
-  (word, count)
-}.cache()
+}
+
+val maybeFilteredWordCounts = if (filteringRequired) {
+  allWordCounts.filter { case (word, (wc, df)) => (df >= minDf) && (df 
<= maxDf) }
+} else {
+  allWordCounts
+}
+
+val wordCounts = maybeFilteredWordCounts
+  .map { case (word, (count, dfCount)) => (word, count) }
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread ymazari
Github user ymazari commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164275721
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
+val maybeInputSize = if (filteringRequired) {
+  Some(input.cache().count())
+} else {
+  None
+}
 val minDf = if ($(minDF) >= 1.0) {
   $(minDF)
 } else {
-  $(minDF) * input.cache().count()
+  $(minDF) * maybeInputSize.getOrElse(1L)
 }
-val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) =>
+val maxDf = if ($(maxDF) >= 1.0) {
+  $(maxDF)
+} else {
+  $(maxDF) * maybeInputSize.getOrElse(1L)
+}
+require(maxDf >= minDf, "maxDF must be >= minDF.")
+val allWordCounts = input.flatMap { case (tokens) =>
   val wc = new OpenHashMap[String, Long]
   tokens.foreach { w =>
 wc.changeValue(w, 1L, _ + 1L)
   }
   wc.map { case (word, count) => (word, (count, 1)) }
 }.reduceByKey { case ((wc1, df1), (wc2, df2)) =>
   (wc1 + wc2, df1 + df2)
-}.filter { case (word, (wc, df)) =>
-  df >= minDf
-}.map { case (word, (count, dfCount)) =>
-  (word, count)
-}.cache()
+}
+
+val maybeFilteredWordCounts = if (filteringRequired) {
+  allWordCounts.filter { case (word, (wc, df)) => (df >= minDf) && (df 
<= maxDf) }
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread ymazari
Github user ymazari commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164275712
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
+val maybeInputSize = if (filteringRequired) {
+  Some(input.cache().count())
+} else {
+  None
+}
 val minDf = if ($(minDF) >= 1.0) {
   $(minDF)
 } else {
-  $(minDF) * input.cache().count()
+  $(minDF) * maybeInputSize.getOrElse(1L)
 }
-val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) =>
+val maxDf = if ($(maxDF) >= 1.0) {
+  $(maxDF)
+} else {
+  $(maxDF) * maybeInputSize.getOrElse(1L)
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread ymazari
Github user ymazari commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164275697
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread ymazari
Github user ymazari commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164275706
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
+val maybeInputSize = if (filteringRequired) {
+  Some(input.cache().count())
+} else {
+  None
+}
 val minDf = if ($(minDF) >= 1.0) {
   $(minDF)
 } else {
-  $(minDF) * input.cache().count()
+  $(minDF) * maybeInputSize.getOrElse(1L)
--- End diff --

Right. I guess I was overdoing it here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20372: [SPARK-23249] Improved block merging logic for partition...

2018-01-27 Thread glentakahashi
Github user glentakahashi commented on the issue:

https://github.com/apache/spark/pull/20372
  
Created https://issues.apache.org/jira/browse/SPARK-23249


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20396: [SPARK-23217][ML] Add cosine distance measure to Cluster...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20396
  
**[Test build #4080 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4080/testReport)**
 for PR 20396 at commit 
[`8a68f75`](https://github.com/apache/spark/commit/8a68f758a7a41f6c2a9a58f54a982745665be6a6).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164273743
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
--- End diff --

yes, this is the right condition to choose whether  to `filter` or not, but 
the right one to choose whether to do the count or not is the one you just 
stated


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20396: [SPARK-23217][ML] Add cosine distance measure to Cluster...

2018-01-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20396
  
**[Test build #4080 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4080/testReport)**
 for PR 20396 at commit 
[`8a68f75`](https://github.com/apache/spark/commit/8a68f758a7a41f6c2a9a58f54a982745665be6a6).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164273656
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
--- End diff --

I imagine you just want the same conditions that are used below, like 
`$(minDF) >= 1.0`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrin...

2018-01-27 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20416
  
cc @yanboliang, @ueshin, @viirya and @MLnick, could you guys check if it 
makes sense to you when you are available?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20416: [SPARK-23248][PYTHON][EXAMPLES] Relocate module d...

2018-01-27 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/20416

[SPARK-23248][PYTHON][EXAMPLES] Relocate module docstrings to the top in 
PySpark examples

## What changes were proposed in this pull request?

This PR proposes to relocate the docstrings in modules of examples to the 
top. Seems these are mistakes. So, for example, the below codes

```python
>>> help(aft_survival_regression)
```

shows the module docstrings for examples as below:

**Before**

```
Help on module aft_survival_regression:

NAME
aft_survival_regression

...

DESCRIPTION
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 
2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

...

(END)
```

**After**

```
Help on module aft_survival_regression:

NAME
aft_survival_regression

...

DESCRIPTION
An example demonstrating aft survival regression.
Run with:
  bin/spark-submit 
examples/src/main/python/ml/aft_survival_regression.py

(END)
```

## How was this patch tested?

Manually checked.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark module-docstring-example

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20416.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20416


commit f2026ebb3b0a0bf00f0f9229fceae1e70b3b6398
Author: hyukjinkwon 
Date:   2018-01-27T12:05:26Z

Relocate module docstrings to the top in PySpark examples




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20404: [SPARK-23228][PYSPARK] Add Python Created jsparkS...

2018-01-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20404#discussion_r164269656
  
--- Diff: python/pyspark/sql/session.py ---
@@ -225,6 +225,7 @@ def __init__(self, sparkContext, jsparkSession=None):
 if SparkSession._instantiatedSession is None \
 or SparkSession._instantiatedSession._sc._jsc is None:
 SparkSession._instantiatedSession = self
+
self._jvm.org.apache.spark.sql.SparkSession.setDefaultSession(self._jsparkSession)
--- End diff --

I think it would overwrite. Can we try something like this?

```diff
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 6c84023c43f..0bdfc88153f 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -213,7 +213,10 @@ class SparkSession(object):
 self._jsc = self._sc._jsc
 self._jvm = self._sc._jvm
 if jsparkSession is None:
-jsparkSession = self._jvm.SparkSession(self._jsc.sc())
+if self._jvm.SparkSession.getDefaultSession().isEmpty():
+jsparkSession = 
self._jvm.SparkSession.getDefaultSession().get()
+else:
+jsparkSession = self._jvm.SparkSession(self._jsc.sc())
 self._jsparkSession = jsparkSession
 self._jwrapped = self._jsparkSession.sqlContext()
 self._wrapped = SQLContext(self._sc, self, self._jwrapped)
@@ -225,6 +228,8 @@ class SparkSession(object):
 if SparkSession._instantiatedSession is None \
 or SparkSession._instantiatedSession._sc._jsc is None:
 SparkSession._instantiatedSession = self
+if self._jvm.SparkSession.getDefaultSession().isEmpty():
+   
self._jvm.SparkSession.setDefaultSession(self._jsparkSession)

 def _repr_html_(self):
 return """
@@ -759,6 +764,7 @@ class SparkSession(object):
 """Stop the underlying :class:`SparkContext`.
 """
 self._sc.stop()
+self._jvm.org.apache.spark.sql.SparkSession.clearDefaultSession()
 SparkSession._instantiatedSession = None
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20383: [SPARK-23200] Reset Kubernetes-specific config on Checkp...

2018-01-27 Thread ssaavedra
Github user ssaavedra commented on the issue:

https://github.com/apache/spark/pull/20383
  
spark-integration was created much later. I originally opened this as 
https://github.com/apache-spark-on-k8s/spark/pull/516 last September. However, 
the integration tests repo exists since December 1st.

It would be interesting to have a test, but we'd need to set up 
checkpointing. In my test setup I used AWS S3 for the checkpoint storage but in 
order not to depend on that, we'd have to deploy a distributed filesystem 
supported by Hadoop in minikube. Although configuring it should be an 
orthogonal problem to this test scenario, but we may run into other integration 
issues.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20409: [SPARK-23233][PYTHON] Reset the cache in asNondeterminis...

2018-01-27 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20409
  
I was hesitant because it accesses to a lot of internal instances via JVM 
as I wrote in the PR description. Let me just use it as a test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20410: [SPARK-23234][ML][PYSPARK] Remove setting default...

2018-01-27 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20410#discussion_r164267635
  
--- Diff: python/pyspark/ml/wrapper.py ---
@@ -118,10 +118,9 @@ def _transfer_params_to_java(self):
 """
 Transforms the embedded params to the companion Java object.
 """
-paramMap = self.extractParamMap()
 for param in self.params:
-if param in paramMap:
-pair = self._make_java_param_pair(param, paramMap[param])
+if param in self._paramMap:
+pair = self._make_java_param_pair(param, 
self._paramMap[param])
--- End diff --

Thanks for your comment. There is only one problem: you _can't_ transfer 
the default values to Scala without using `set`. Indeed, `setDefault` is 
`protected`, so it can't be called by Python.
Moreover, we already have test cases which ensures that the defaults in 
Python and Scala are the same. And since the user can't change a default, we 
are on the safe side. As I said before, I think that a good next step would be 
to read the defaults from Scala in Python and this will make us sure that they 
will be always consistent.
Thanks for the suggestion, but I don't really agree that using 
`getOrDefault` would make the intent more clear: on the opposite, I think it 
might be confusing. I can use `isSet` as you suggested, instead, or I can add a 
comment, what do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20405: [SPARK-23229][SQL] Dataset.hint should use planWi...

2018-01-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20405#discussion_r164267184
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -1216,7 +1216,7 @@ class Dataset[T] private[sql](
*/
   @scala.annotation.varargs
   def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan {
-UnresolvedHint(name, parameters, logicalPlan)
+UnresolvedHint(name, parameters, planWithBarrier)
--- End diff --

@jaceklaskowski Because the logical plan is wrapped in analysis barrier, 
`ResolveBroadcastHints` can't traverse down it to reach the 
`UnresolvedRelation` at 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala#L60-L61.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164267076
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
+val maybeInputSize = if (filteringRequired) {
+  Some(input.cache().count())
+} else {
+  None
+}
 val minDf = if ($(minDF) >= 1.0) {
   $(minDF)
 } else {
-  $(minDF) * input.cache().count()
+  $(minDF) * maybeInputSize.getOrElse(1L)
--- End diff --

I think `get` is enough...when we are here it must be set


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164267079
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
+val maybeInputSize = if (filteringRequired) {
+  Some(input.cache().count())
+} else {
+  None
+}
 val minDf = if ($(minDF) >= 1.0) {
   $(minDF)
 } else {
-  $(minDF) * input.cache().count()
+  $(minDF) * maybeInputSize.getOrElse(1L)
 }
-val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) =>
+val maxDf = if ($(maxDF) >= 1.0) {
+  $(maxDF)
+} else {
+  $(maxDF) * maybeInputSize.getOrElse(1L)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164267103
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
+val maybeInputSize = if (filteringRequired) {
+  Some(input.cache().count())
+} else {
+  None
+}
 val minDf = if ($(minDF) >= 1.0) {
   $(minDF)
 } else {
-  $(minDF) * input.cache().count()
+  $(minDF) * maybeInputSize.getOrElse(1L)
 }
-val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) =>
+val maxDf = if ($(maxDF) >= 1.0) {
+  $(maxDF)
+} else {
+  $(maxDF) * maybeInputSize.getOrElse(1L)
+}
+require(maxDf >= minDf, "maxDF must be >= minDF.")
+val allWordCounts = input.flatMap { case (tokens) =>
   val wc = new OpenHashMap[String, Long]
   tokens.foreach { w =>
 wc.changeValue(w, 1L, _ + 1L)
   }
   wc.map { case (word, count) => (word, (count, 1)) }
 }.reduceByKey { case ((wc1, df1), (wc2, df2)) =>
   (wc1 + wc2, df1 + df2)
-}.filter { case (word, (wc, df)) =>
-  df >= minDf
-}.map { case (word, (count, dfCount)) =>
-  (word, count)
-}.cache()
+}
+
+val maybeFilteredWordCounts = if (filteringRequired) {
+  allWordCounts.filter { case (word, (wc, df)) => (df >= minDf) && (df 
<= maxDf) }
+} else {
+  allWordCounts
+}
+
+val wordCounts = maybeFilteredWordCounts
+  .map { case (word, (count, dfCount)) => (word, count) }
--- End diff --

nit: `case (word, (count, _))`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164267128
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
+val maybeInputSize = if (filteringRequired) {
+  Some(input.cache().count())
+} else {
+  None
+}
 val minDf = if ($(minDF) >= 1.0) {
   $(minDF)
 } else {
-  $(minDF) * input.cache().count()
+  $(minDF) * maybeInputSize.getOrElse(1L)
 }
-val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) =>
+val maxDf = if ($(maxDF) >= 1.0) {
+  $(maxDF)
+} else {
+  $(maxDF) * maybeInputSize.getOrElse(1L)
+}
+require(maxDf >= minDf, "maxDF must be >= minDF.")
+val allWordCounts = input.flatMap { case (tokens) =>
   val wc = new OpenHashMap[String, Long]
   tokens.foreach { w =>
 wc.changeValue(w, 1L, _ + 1L)
   }
   wc.map { case (word, count) => (word, (count, 1)) }
 }.reduceByKey { case ((wc1, df1), (wc2, df2)) =>
   (wc1 + wc2, df1 + df2)
-}.filter { case (word, (wc, df)) =>
-  df >= minDf
-}.map { case (word, (count, dfCount)) =>
-  (word, count)
-}.cache()
+}
+
+val maybeFilteredWordCounts = if (filteringRequired) {
--- End diff --

after addressing my previous comment this would need to be changed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164267057
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
--- End diff --

this would cause a count also when it is not necessary, ie. when 
`minDF`/`maxDF` are set not as a fraction but as an integer 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVec...

2018-01-27 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20367#discussion_r164267118
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---
@@ -155,24 +182,47 @@ class CountVectorizer @Since("1.5.0") 
(@Since("1.5.0") override val uid: String)
 transformSchema(dataset.schema, logging = true)
 val vocSize = $(vocabSize)
 val input = 
dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+val filteringRequired = isSet(minDF) || isSet(maxDF)
+val maybeInputSize = if (filteringRequired) {
+  Some(input.cache().count())
+} else {
+  None
+}
 val minDf = if ($(minDF) >= 1.0) {
   $(minDF)
 } else {
-  $(minDF) * input.cache().count()
+  $(minDF) * maybeInputSize.getOrElse(1L)
 }
-val wordCounts: RDD[(String, Long)] = input.flatMap { case (tokens) =>
+val maxDf = if ($(maxDF) >= 1.0) {
+  $(maxDF)
+} else {
+  $(maxDF) * maybeInputSize.getOrElse(1L)
+}
+require(maxDf >= minDf, "maxDF must be >= minDF.")
+val allWordCounts = input.flatMap { case (tokens) =>
   val wc = new OpenHashMap[String, Long]
   tokens.foreach { w =>
 wc.changeValue(w, 1L, _ + 1L)
   }
   wc.map { case (word, count) => (word, (count, 1)) }
 }.reduceByKey { case ((wc1, df1), (wc2, df2)) =>
   (wc1 + wc2, df1 + df2)
-}.filter { case (word, (wc, df)) =>
-  df >= minDf
-}.map { case (word, (count, dfCount)) =>
-  (word, count)
-}.cache()
+}
+
+val maybeFilteredWordCounts = if (filteringRequired) {
+  allWordCounts.filter { case (word, (wc, df)) => (df >= minDf) && (df 
<= maxDf) }
--- End diff --

nit: `case (_, (_, dfCount))` and I still would prefer to avoid the 
unnecessary parenthesis


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20405: [SPARK-23229][SQL] Dataset.hint should use planWi...

2018-01-27 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20405#discussion_r164267100
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -1216,7 +1216,7 @@ class Dataset[T] private[sql](
*/
   @scala.annotation.varargs
   def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan {
-UnresolvedHint(name, parameters, logicalPlan)
+UnresolvedHint(name, parameters, planWithBarrier)
--- End diff --

I thought that that's what `ResolveBroadcastHints` does --> 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala#L93-L101,
 doesn't it? I'm going to write a test case for it to confirm (and that's what 
I was asking for in the email to dev@spark the other day).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2