[GitHub] spark pull request #21042: [SPARK-22883][ML] ML test for StructuredStreaming...

2018-04-11 Thread jkbradley
Github user jkbradley closed the pull request at:

https://github.com/apache/spark/pull/21042


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-22883][ML] ML test for StructuredStreaming: spark.ml.feature, I-M

2018-04-11 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 320269e49 -> acfc156df


[SPARK-22883][ML] ML test for StructuredStreaming: spark.ml.feature, I-M

This backports https://github.com/apache/spark/pull/20964 to branch-2.3.

## What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:
* IDF
* Imputer
* Interaction
* MaxAbsScaler
* MinHashLSH
* MinMaxScaler
* NGram

## How was this patch tested?

It is a bunch of tests!

Author: Joseph K. Bradley 

Author: Joseph K. Bradley 

Closes #21042 from jkbradley/SPARK-22883-part2-2.3backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acfc156d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acfc156d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acfc156d

Branch: refs/heads/branch-2.3
Commit: acfc156df551632007a47b7ec7a7c901a713082d
Parents: 320269e
Author: Joseph K. Bradley 
Authored: Wed Apr 11 11:41:50 2018 -0700
Committer: Joseph K. Bradley 
Committed: Wed Apr 11 11:41:50 2018 -0700

--
 .../org/apache/spark/ml/feature/IDFSuite.scala  | 14 +++---
 .../apache/spark/ml/feature/ImputerSuite.scala  | 31 ++---
 .../spark/ml/feature/InteractionSuite.scala | 46 ++--
 .../spark/ml/feature/MaxAbsScalerSuite.scala| 14 +++---
 .../spark/ml/feature/MinHashLSHSuite.scala  | 25 ---
 .../spark/ml/feature/MinMaxScalerSuite.scala| 14 +++---
 .../apache/spark/ml/feature/NGramSuite.scala|  2 +-
 7 files changed, 89 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/acfc156d/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
--
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
index 005edf7..cdd62be 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
@@ -17,17 +17,15 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
 import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel}
 import org.apache.spark.mllib.linalg.VectorImplicits._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.Row
 
-class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+class IDFSuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
@@ -57,7 +55,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
   Vectors.dense(0.0, 1.0, 2.0, 3.0),
   Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
 )
-val numOfData = data.size
+val numOfData = data.length
 val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
   math.log((numOfData + 1.0) / (x + 1.0))
 })
@@ -72,7 +70,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
 
 MLTestingUtils.checkCopyAndUids(idfEst, idfModel)
 
-idfModel.transform(df).select("idfValue", "expected").collect().foreach {
+testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
   case Row(x: Vector, y: Vector) =>
 assert(x ~== y absTol 1e-5, "Transformed vector is different with 
expected vector.")
 }
@@ -85,7 +83,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
   Vectors.dense(0.0, 1.0, 2.0, 3.0),
   Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
 )
-val numOfData = data.size
+val numOfData = data.length
 val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
   if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0
 })
@@ -99,7 +97,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
   .setMinDocFreq(1)
   .fit(df)
 
-idfModel.transform(df).select("idfValue", "expected").collect().foreach {
+testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
   case Row(x: Vector, y: Vector) =>
 assert(x ~== y absTol 1e-5, "Transformed vector is different with 
expected vector.")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/acfc156d/mllib/src/test/scala/org/apache/spark

[GitHub] spark issue #21042: [SPARK-22883][ML] ML test for StructuredStreaming: spark...

2018-04-11 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/21042
  
Since this had a LGTM for the original PR and has no changes and tests 
pass, I'll merge this with branch-2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21042: [SPARK-22883][ML] ML test for StructuredStreaming: spark...

2018-04-11 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/21042
  
I haven't made any changes from the PR which was merged into master in 
https://github.com/apache/spark/pull/20964


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21042: [SPARK-22883] ML test for StructuredStreaming: sp...

2018-04-11 Thread jkbradley
GitHub user jkbradley opened a pull request:

https://github.com/apache/spark/pull/21042

[SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M

This backports https://github.com/apache/spark/pull/20964 to branch-2.3.

## What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:
* IDF
* Imputer
* Interaction
* MaxAbsScaler
* MinHashLSH
* MinMaxScaler
* NGram

## How was this patch tested?

It is a bunch of tests!

Author: Joseph K. Bradley 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jkbradley/spark SPARK-22883-part2-2.3backport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21042.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21042


commit a4209436f177ee59ee39e1c715694f533ecc3d36
Author: Joseph K. Bradley 
Date:   2018-04-11T16:59:38Z

[SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M

## What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:
* IDF
* Imputer
* Interaction
* MaxAbsScaler
* MinHashLSH
* MinMaxScaler
* NGram

## How was this patch tested?

It is a bunch of tests!

Author: Joseph K. Bradley 

Closes #20964 from jkbradley/SPARK-22883-part2.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20964: [SPARK-22883] ML test for StructuredStreaming: spark.ml....

2018-04-11 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20964
  
Merging with master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M

2018-04-11 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 3cb82047f -> 75a183071


[SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M

## What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:
* IDF
* Imputer
* Interaction
* MaxAbsScaler
* MinHashLSH
* MinMaxScaler
* NGram

## How was this patch tested?

It is a bunch of tests!

Author: Joseph K. Bradley 

Closes #20964 from jkbradley/SPARK-22883-part2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75a18307
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75a18307
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75a18307

Branch: refs/heads/master
Commit: 75a183071c4ed2e407c930edfdf721779662b3ee
Parents: 3cb8204
Author: Joseph K. Bradley 
Authored: Wed Apr 11 09:59:38 2018 -0700
Committer: Joseph K. Bradley 
Committed: Wed Apr 11 09:59:38 2018 -0700

--
 .../org/apache/spark/ml/feature/IDFSuite.scala  | 14 +++---
 .../apache/spark/ml/feature/ImputerSuite.scala  | 31 ++---
 .../spark/ml/feature/InteractionSuite.scala | 46 ++--
 .../spark/ml/feature/MaxAbsScalerSuite.scala| 14 +++---
 .../spark/ml/feature/MinHashLSHSuite.scala  | 25 ---
 .../spark/ml/feature/MinMaxScalerSuite.scala| 14 +++---
 .../apache/spark/ml/feature/NGramSuite.scala|  2 +-
 7 files changed, 89 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
--
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
index 005edf7..cdd62be 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
@@ -17,17 +17,15 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
 import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel}
 import org.apache.spark.mllib.linalg.VectorImplicits._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.Row
 
-class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+class IDFSuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
@@ -57,7 +55,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
   Vectors.dense(0.0, 1.0, 2.0, 3.0),
   Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
 )
-val numOfData = data.size
+val numOfData = data.length
 val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
   math.log((numOfData + 1.0) / (x + 1.0))
 })
@@ -72,7 +70,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
 
 MLTestingUtils.checkCopyAndUids(idfEst, idfModel)
 
-idfModel.transform(df).select("idfValue", "expected").collect().foreach {
+testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
   case Row(x: Vector, y: Vector) =>
 assert(x ~== y absTol 1e-5, "Transformed vector is different with 
expected vector.")
 }
@@ -85,7 +83,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
   Vectors.dense(0.0, 1.0, 2.0, 3.0),
   Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
 )
-val numOfData = data.size
+val numOfData = data.length
 val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
   if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0
 })
@@ -99,7 +97,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
   .setMinDocFreq(1)
   .fit(df)
 
-idfModel.transform(df).select("idfValue", "expected").collect().foreach {
+testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
   case Row(x: Vector, y: Vector) =>
 assert(x ~== y absTol 1e-5, "Transformed vector is different with 
expected vector.")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
--
diff --git 

[GitHub] spark issue #20964: [SPARK-22883] ML test for StructuredStreaming: spark.ml....

2018-04-10 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20964
  
I rebased off of master because of the merge warning in the last tests.
I did not have to resolve any conflicts.
I'll merge this once tests pass.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-23944][ML] Add the set method for the two LSHModel

2018-04-10 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 4f1e8b9bb -> 7c7570d46


[SPARK-23944][ML] Add the set method for the two LSHModel

## What changes were proposed in this pull request?

Add two set method for LSHModel in LSH.scala, 
BucketedRandomProjectionLSH.scala, and MinHashLSH.scala

## How was this patch tested?

New test for the param setup was added into

- BucketedRandomProjectionLSHSuite.scala

- MinHashLSHSuite.scala

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Lu WANG 

Closes #21015 from ludatabricks/SPARK-23944.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c7570d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c7570d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c7570d4

Branch: refs/heads/master
Commit: 7c7570d466a8ded51e580eb6a28583bd9a9c5337
Parents: 4f1e8b9
Author: Lu WANG 
Authored: Tue Apr 10 17:26:06 2018 -0700
Committer: Joseph K. Bradley 
Committed: Tue Apr 10 17:26:06 2018 -0700

--
 .../spark/ml/feature/BucketedRandomProjectionLSH.scala   | 8 
 mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala   | 6 ++
 .../main/scala/org/apache/spark/ml/feature/MinHashLSH.scala  | 8 
 .../spark/ml/feature/BucketedRandomProjectionLSHSuite.scala  | 8 
 .../scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala  | 8 
 5 files changed, 38 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7c7570d4/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
index 36a46ca..41eaaf9 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
@@ -73,6 +73,14 @@ class BucketedRandomProjectionLSHModel private[ml](
 private[ml] val randUnitVectors: Array[Vector])
   extends LSHModel[BucketedRandomProjectionLSHModel] with 
BucketedRandomProjectionLSHParams {
 
+  /** @group setParam */
+  @Since("2.4.0")
+  override def setInputCol(value: String): this.type = super.set(inputCol, 
value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  override def setOutputCol(value: String): this.type = super.set(outputCol, 
value)
+
   @Since("2.1.0")
   override protected[ml] val hashFunction: Vector => Array[Vector] = {
 key: Vector => {

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7570d4/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
index 1c9f47a..a70931f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
@@ -65,6 +65,12 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
   extends Model[T] with LSHParams with MLWritable {
   self: T =>
 
+  /** @group setParam */
+  def setInputCol(value: String): this.type = set(inputCol, value)
+
+  /** @group setParam */
+  def setOutputCol(value: String): this.type = set(outputCol, value)
+
   /**
* The hash function of LSH, mapping an input feature vector to multiple 
hash vectors.
* @return The mapping of LSH function.

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7570d4/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala
index 145422a..556848e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala
@@ -51,6 +51,14 @@ class MinHashLSHModel private[ml](
 private[ml] val randCoefficients: Array[(Int, Int)])
   extends LSHModel[MinHashLSHModel] {
 
+  /** @group setParam */
+  @Since("2.4.0")
+  override def setInputCol(value: String): this.type = super.set(inputCol, 
value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  override def setOutputCol(value: String): this.type = super.set(outputCol, 
value)
+
   @Since("2.1.0")
   override protected[ml] val hashFunction: Vector => Array[Vector] = {
 elems: Vector => {

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7570d4/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala
-

[GitHub] spark issue #21015: [SPARK-23944][ML] Add the set method for the two LSHMode...

2018-04-10 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/21015
  
LGTM
Merging with master
Thanks @ludatabricks !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...

2018-04-10 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/15770
  
@wangmiao1981  Do let me know if you're too busy now to resume this; I know 
it's been a long time.  Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20964: [SPARK-22883] ML test for StructuredStreaming: spark.ml....

2018-04-10 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20964
  
Thanks!  I'll rerun tests since they are stale and merge after they pass.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-23871][ML][PYTHON] add python api for VectorAssembler handleInvalid

2018-04-10 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master adb222b95 -> 4f1e8b9bb


[SPARK-23871][ML][PYTHON] add python api for VectorAssembler handleInvalid

## What changes were proposed in this pull request?

add python api for VectorAssembler handleInvalid

## How was this patch tested?

Add doctest

Author: Huaxin Gao 

Closes #21003 from huaxingao/spark-23871.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f1e8b9b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f1e8b9b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f1e8b9b

Branch: refs/heads/master
Commit: 4f1e8b9bb7d795d4ca3d5cd5dcc0f9419e52dfae
Parents: adb222b
Author: Huaxin Gao 
Authored: Tue Apr 10 15:41:45 2018 -0700
Committer: Joseph K. Bradley 
Committed: Tue Apr 10 15:41:45 2018 -0700

--
 .../spark/ml/feature/VectorAssembler.scala  | 12 +++---
 python/pyspark/ml/feature.py| 42 +---
 2 files changed, 43 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f1e8b9b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
index 6bf4aa3..4061154 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
@@ -71,12 +71,12 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
*/
   @Since("2.4.0")
   override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
-"""Param for how to handle invalid data (NULL values). Options are 'skip' 
(filter out rows with
-  |invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
-  |output). Column lengths are taken from the size of ML Attribute Group, 
which can be set using
-  |`VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths 
can also be inferred
-  |from first rows of the data since it is safe to do so but only in case 
of 'error' or 'skip'.
-  |""".stripMargin.replaceAll("\n", " "),
+"""Param for how to handle invalid data (NULL and NaN values). Options are 
'skip' (filter out
+  |rows with invalid data), 'error' (throw an error), or 'keep' (return 
relevant number of NaN
+  |in the output). Column lengths are taken from the size of ML Attribute 
Group, which can be
+  |set using `VectorSizeHint` in a pipeline before `VectorAssembler`. 
Column lengths can also
+  |be inferred from first rows of the data since it is safe to do so but 
only in case of 'error'
+  |or 'skip'.""".stripMargin.replaceAll("\n", " "),
 ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
 
   setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)

http://git-wip-us.apache.org/repos/asf/spark/blob/4f1e8b9b/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 5a3e0dd..cdda30c 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -2701,7 +2701,8 @@ class Tokenizer(JavaTransformer, HasInputCol, 
HasOutputCol, JavaMLReadable, Java
 
 
 @inherit_doc
-class VectorAssembler(JavaTransformer, HasInputCols, HasOutputCol, 
JavaMLReadable, JavaMLWritable):
+class VectorAssembler(JavaTransformer, HasInputCols, HasOutputCol, 
HasHandleInvalid, JavaMLReadable,
+  JavaMLWritable):
 """
 A feature transformer that merges multiple columns into a vector column.
 
@@ -2719,25 +2720,56 @@ class VectorAssembler(JavaTransformer, HasInputCols, 
HasOutputCol, JavaMLReadabl
 >>> loadedAssembler = VectorAssembler.load(vectorAssemblerPath)
 >>> loadedAssembler.transform(df).head().freqs == 
vecAssembler.transform(df).head().freqs
 True
+>>> dfWithNullsAndNaNs = spark.createDataFrame(
+...[(1.0, 2.0, None), (3.0, float("nan"), 4.0), (5.0, 6.0, 7.0)], 
["a", "b", "c"])
+>>> vecAssembler2 = VectorAssembler(inputCols=["a", "b", "c"], 
outputCol="features",
+...handleInvalid="keep")
+>>> vecAssembler2.transform(dfWithNullsAndNaNs).show()
++---+---++-+
+|  a|  b|   c| features|
++---+---++-+
+|1.0|2.0|null|[1.0,2.0,NaN]|
+|3.0|NaN| 4.0|[3.0,NaN,4.0]|
+|5.0|6.0| 7.0|[5.0,6.0,7.0]|
++---+---++-+
+...
+>>> 
vecAssembler2.setParams(handleInvalid="skip").transform(dfWithNullsAndNaNs).show()
++---+---+---+-+
+|  a|  b|  c| feature

[GitHub] spark issue #21003: [SPARK-23871][ML][PYTHON]add python api for VectorAssemb...

2018-04-10 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/21003
  
Merging with master
Thanks @huaxingao !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21003: [SPARK-23871][ML][PYTHON]add python api for VectorAssemb...

2018-04-10 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/21003
  
LGTM
But would you mind making one fix in the doc here and in Scala?  "Param for 
how to handle invalid data (NULL values)" should actually read "Param for how 
to handle invalid data (NULL **and NaN** values)"

Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21030: typo rawPredicition changed to rawPrediction

2018-04-10 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/21030
  
LGTM pending tests
Thanks for finding & fixing this!

Would you mind creating a JIRA and linking it to 
https://issues.apache.org/jira/browse/SPARK-21856 ?  I'd like a tracking JIRA 
since I'd like to backport it to branch-2.3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21030: typo rawPredicition changed to rawPrediction

2018-04-10 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/21030
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-23751][ML][PYSPARK] Kolmogorov-Smirnoff test Python API in pyspark.ml

2018-04-10 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master e17965891 -> adb222b95


[SPARK-23751][ML][PYSPARK] Kolmogorov-Smirnoff test Python API in pyspark.ml

## What changes were proposed in this pull request?

Kolmogorov-Smirnoff test Python API in `pyspark.ml`

**Note**  API with `CDF` is a little difficult to support in python. We can add 
it in following PR.

## How was this patch tested?

doctest

Author: WeichenXu 

Closes #20904 from WeichenXu123/ks-test-py.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/adb222b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/adb222b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/adb222b9

Branch: refs/heads/master
Commit: adb222b957f327a69929b8f16fa5ebc071fa99e3
Parents: e179658
Author: WeichenXu 
Authored: Tue Apr 10 11:18:14 2018 -0700
Committer: Joseph K. Bradley 
Committed: Tue Apr 10 11:18:14 2018 -0700

--
 .../spark/ml/stat/KolmogorovSmirnovTest.scala   |  29 +--
 python/pyspark/ml/stat.py   | 181 +--
 2 files changed, 138 insertions(+), 72 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/adb222b9/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
index c62d746..af8ff64 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
@@ -24,7 +24,7 @@ import org.apache.spark.api.java.function.Function
 import org.apache.spark.ml.util.SchemaUtils
 import org.apache.spark.mllib.stat.{Statistics => OldStatistics}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions.col
 
 /**
@@ -59,7 +59,7 @@ object KolmogorovSmirnovTest {
* distribution of the sample data and the theoretical distribution we can 
provide a test for the
* the null hypothesis that the sample data comes from that theoretical 
distribution.
*
-   * @param dataset a `DataFrame` containing the sample of data to test
+   * @param dataset A `Dataset` or a `DataFrame` containing the sample of data 
to test
* @param sampleCol Name of sample column in dataset, of any numerical type
* @param cdf a `Double => Double` function to calculate the theoretical CDF 
at a given value
* @return DataFrame containing the test result for the input sampled data.
@@ -68,10 +68,10 @@ object KolmogorovSmirnovTest {
*  - `statistic: Double`
*/
   @Since("2.4.0")
-  def test(dataset: DataFrame, sampleCol: String, cdf: Double => Double): 
DataFrame = {
+  def test(dataset: Dataset[_], sampleCol: String, cdf: Double => Double): 
DataFrame = {
 val spark = dataset.sparkSession
 
-val rdd = getSampleRDD(dataset, sampleCol)
+val rdd = getSampleRDD(dataset.toDF(), sampleCol)
 val testResult = OldStatistics.kolmogorovSmirnovTest(rdd, cdf)
 spark.createDataFrame(Seq(KolmogorovSmirnovTestResult(
   testResult.pValue, testResult.statistic)))
@@ -81,10 +81,11 @@ object KolmogorovSmirnovTest {
* Java-friendly version of `test(dataset: DataFrame, sampleCol: String, 
cdf: Double => Double)`
*/
   @Since("2.4.0")
-  def test(dataset: DataFrame, sampleCol: String,
-cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = {
-val f: Double => Double = x => cdf.call(x)
-test(dataset, sampleCol, f)
+  def test(
+  dataset: Dataset[_],
+  sampleCol: String,
+  cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = {
+test(dataset, sampleCol, (x: Double) => cdf.call(x))
   }
 
   /**
@@ -92,10 +93,11 @@ object KolmogorovSmirnovTest {
* distribution equality. Currently supports the normal distribution, taking 
as parameters
* the mean and standard deviation.
*
-   * @param dataset a `DataFrame` containing the sample of data to test
+   * @param dataset A `Dataset` or a `DataFrame` containing the sample of data 
to test
* @param sampleCol Name of sample column in dataset, of any numerical type
* @param distName a `String` name for a theoretical distribution, currently 
only support "norm".
-   * @param params `Double*` specifying the parameters to be used for the 
theoretical distribution
+   * @param params `Double*` specifying the parameters to be used for the 
theoretical distribution.
+*  For "norm" distribution, the parameters includes mean and 
variance.
* @return DataFrame containing the test result for the input sampled data.
*

[GitHub] spark issue #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff test Pyth...

2018-04-10 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20904
  
LGTM
Thanks for the PR!
Merging with master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21015: [SPARK-23944][ML] Add the set method for the two LSHMode...

2018-04-09 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/21015
  
add to whitelist


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-09 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r180228711
  
--- Diff: python/pyspark/ml/stat.py ---
@@ -127,13 +113,86 @@ class Correlation(object):
 def corr(dataset, column, method="pearson"):
 """
 Compute the correlation matrix with specified method using dataset.
+
+:param dataset:
+  A Dataset or a DataFrame.
+:param column:
+  The name of the column of vectors for which the correlation 
coefficient needs
+  to be computed. This must be a column of the dataset, and it 
must contain
+  Vector objects.
+:param method:
+  String specifying the method to use for computing correlation.
+  Supported: `pearson` (default), `spearman`.
+:return:
+  A DataFrame that contains the correlation matrix of the column 
of vectors. This
+  DataFrame contains a single row and a single column of name
+  '$METHODNAME($COLUMN)'.
 """
 sc = SparkContext._active_spark_context
 javaCorrObj = _jvm().org.apache.spark.ml.stat.Correlation
 args = [_py2java(sc, arg) for arg in (dataset, column, method)]
 return _java2py(sc, javaCorrObj.corr(*args))
 
 
+class KolmogorovSmirnovTest(object):
+"""
+.. note:: Experimental
+
+Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a continuous
+distribution.
+
+By comparing the largest difference between the empirical cumulative
+distribution of the sample data and the theoretical distribution we 
can provide a test for the
+the null hypothesis that the sample data comes from that theoretical 
distribution.
+
+>>> from pyspark.ml.stat import KolmogorovSmirnovTest
--- End diff --

Thanks for moving the method-specific documentation.  These doctests are 
method-specific too, though, so can you please move them as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-09 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r180245120
  
--- Diff: python/pyspark/ml/stat.py ---
@@ -127,13 +113,86 @@ class Correlation(object):
 def corr(dataset, column, method="pearson"):
 """
 Compute the correlation matrix with specified method using dataset.
+
+:param dataset:
+  A Dataset or a DataFrame.
+:param column:
+  The name of the column of vectors for which the correlation 
coefficient needs
+  to be computed. This must be a column of the dataset, and it 
must contain
+  Vector objects.
+:param method:
+  String specifying the method to use for computing correlation.
+  Supported: `pearson` (default), `spearman`.
+:return:
+  A DataFrame that contains the correlation matrix of the column 
of vectors. This
+  DataFrame contains a single row and a single column of name
+  '$METHODNAME($COLUMN)'.
 """
 sc = SparkContext._active_spark_context
 javaCorrObj = _jvm().org.apache.spark.ml.stat.Correlation
 args = [_py2java(sc, arg) for arg in (dataset, column, method)]
 return _java2py(sc, javaCorrObj.corr(*args))
 
 
+class KolmogorovSmirnovTest(object):
+"""
+.. note:: Experimental
+
+Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a continuous
+distribution.
+
+By comparing the largest difference between the empirical cumulative
+distribution of the sample data and the theoretical distribution we 
can provide a test for the
+the null hypothesis that the sample data comes from that theoretical 
distribution.
+
+>>> from pyspark.ml.stat import KolmogorovSmirnovTest
+>>> dataset = [[-1.0], [0.0], [1.0]]
+>>> dataset = spark.createDataFrame(dataset, ['sample'])
+>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 
0.0, 1.0).first()
+>>> round(ksResult.pValue, 3)
+1.0
+>>> round(ksResult.statistic, 3)
+0.175
+>>> dataset = [[2.0], [3.0], [4.0]]
+>>> dataset = spark.createDataFrame(dataset, ['sample'])
+>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 
3.0, 1.0).first()
+>>> round(ksResult.pValue, 3)
+1.0
+>>> round(ksResult.statistic, 3)
+0.175
+
+.. versionadded:: 2.4.0
+
+"""
+@staticmethod
+@since("2.4.0")
+def test(dataset, sampleCol, distName, *params):
+"""
+Perform a Kolmogorov-Smirnov test using dataset.
--- End diff --

Can you please make this match the text in the Scala doc?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-14681][ML] Provide label/impurity stats for spark.ml decision tree nodes

2018-04-09 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 7c1654e21 -> 252468a74


[SPARK-14681][ML] Provide label/impurity stats for spark.ml decision tree nodes

## What changes were proposed in this pull request?

API:
```
trait ClassificationNode extends Node
  def getLabelCount(label: Int): Double

trait RegressionNode extends Node
  def getCount(): Double
  def getSum(): Double
  def getSquareSum(): Double

// turn LeafNode to be trait
trait LeafNode extends Node {
  def prediction: Double
  def impurity: Double
  ...
}

class ClassificationLeafNode extends ClassificationNode with LeafNode

class RegressionLeafNode extends RegressionNode with LeafNode

// turn InternalNode to be trait
trait InternalNode extends Node{
  def gain: Double
  def leftChild: Node
  def rightChild: Node
  def split: Split
  ...
}

class ClassificationInternalNode extends ClassificationNode with InternalNode
  override def leftChild: ClassificationNode
  override def rightChild: ClassificationNode

class RegressionInternalNode extends RegressionNode with InternalNode
  override val leftChild: RegressionNode
  override val rightChild: RegressionNode

class DecisionTreeClassificationModel
  override val rootNode: ClassificationNode

class DecisionTreeRegressionModel
  override val rootNode: RegressionNode
```
Closes #17466

## How was this patch tested?

UT will be added soon.

Author: WeichenXu 
Author: jkbradley 

Closes #20786 from WeichenXu123/tree_stat_api_2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/252468a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/252468a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/252468a7

Branch: refs/heads/master
Commit: 252468a744b95082400ba9e8b2e3b3d9d50ab7fa
Parents: 7c1654e
Author: WeichenXu 
Authored: Mon Apr 9 12:18:07 2018 -0700
Committer: Joseph K. Bradley 
Committed: Mon Apr 9 12:18:07 2018 -0700

--
 .../classification/DecisionTreeClassifier.scala |  14 +-
 .../spark/ml/classification/GBTClassifier.scala |   6 +-
 .../classification/RandomForestClassifier.scala |   6 +-
 .../ml/regression/DecisionTreeRegressor.scala   |  13 +-
 .../spark/ml/regression/GBTRegressor.scala  |   6 +-
 .../ml/regression/RandomForestRegressor.scala   |   6 +-
 .../scala/org/apache/spark/ml/tree/Node.scala   | 247 +++
 .../spark/ml/tree/impl/RandomForest.scala   |  10 +-
 .../org/apache/spark/ml/tree/treeModels.scala   |  36 ++-
 .../DecisionTreeClassifierSuite.scala   |  31 ++-
 .../ml/classification/GBTClassifierSuite.scala  |   4 +-
 .../RandomForestClassifierSuite.scala   |   5 +-
 .../regression/DecisionTreeRegressorSuite.scala |  14 ++
 .../spark/ml/tree/impl/RandomForestSuite.scala  |  22 +-
 .../apache/spark/ml/tree/impl/TreeTests.scala   |  12 +-
 project/MimaExcludes.scala  |   9 +-
 16 files changed, 333 insertions(+), 108 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/252468a7/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
index 65cce69..771cd4f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
@@ -165,7 +165,7 @@ object DecisionTreeClassifier extends 
DefaultParamsReadable[DecisionTreeClassifi
 @Since("1.4.0")
 class DecisionTreeClassificationModel private[ml] (
 @Since("1.4.0")override val uid: String,
-@Since("1.4.0")override val rootNode: Node,
+@Since("1.4.0")override val rootNode: ClassificationNode,
 @Since("1.6.0")override val numFeatures: Int,
 @Since("1.5.0")override val numClasses: Int)
   extends ProbabilisticClassificationModel[Vector, 
DecisionTreeClassificationModel]
@@ -178,7 +178,7 @@ class DecisionTreeClassificationModel private[ml] (
* Construct a decision tree classification model.
* @param rootNode  Root node of tree, with other nodes attached.
*/
-  private[ml] def this(rootNode: Node, numFeatures: Int, numClasses: Int) =
+  private[ml] def this(rootNode: ClassificationNode, numFeatures: Int, 
numClasses: Int) =
 this(Identifiable.randomUID("dtc"), rootNode, numFeatures, numClasses)
 
   override def predict(features: Vector): Double = {
@@ -276,8 +276,9 @@ object DecisionTreeClassificationModel extends 
MLReadable[DecisionTreeClassifica
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
   val 

[GitHub] spark issue #20786: [SPARK-14681][ML] Provide label/impurity stats for spark...

2018-04-09 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20786
  
LGTM
Merging with master
Thanks @WeichenXu123 !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18227: [SPARK-21005][ML] Fix VectorIndexerModel does not prepar...

2018-04-09 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/18227
  
Just commented on the JIRA about this issue: 
https://issues.apache.org/jira/browse/SPARK-21005


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20825: add impurity stats in tree leaf node debug string

2018-04-06 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20825
  
I actually would prefer not to merge this change since it could blow up the 
size of the strings printed for some classification tasks with large numbers of 
labels.  If people want to debug, they could trace through the tree manually.

Alternatively, I'd be OK with adding an optional argument which tells 
toDebugString to include the stats.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20319: [SPARK-22884][ML][TESTS] ML test for StructuredStreaming...

2018-04-06 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20319
  
@smurakozi Thanks for the PR!  I have bandwidth to review this now.  Do you 
have time to rebase this to fix the merge conflicts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-06 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r179831482
  
--- Diff: python/pyspark/ml/stat.py ---
@@ -134,6 +134,65 @@ def corr(dataset, column, method="pearson"):
 return _java2py(sc, javaCorrObj.corr(*args))
 
 
+class KolmogorovSmirnovTest(object):
+"""
+.. note:: Experimental
+
+Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a continuous
+distribution.
+
+By comparing the largest difference between the empirical cumulative
+distribution of the sample data and the theoretical distribution we 
can provide a test for the
+the null hypothesis that the sample data comes from that theoretical 
distribution.
+
+:param dataset:
+  a dataset or a dataframe containing the sample of data to test.
+:param sampleCol:
+  Name of sample column in dataset, of any numerical type.
+:param distName:
+  a `string` name for a theoretical distribution, currently only 
support "norm".
+:param params:
+  a list of `Double` values specifying the parameters to be used for 
the theoretical
+  distribution
+:return:
+  A dataframe that contains the Kolmogorov-Smirnov test result for the 
input sampled data.
+  This DataFrame will contain a single Row with the following fields:
+  - `pValue: Double`
+  - `statistic: Double`
+
+>>> from pyspark.ml.stat import KolmogorovSmirnovTest
+>>> dataset = [[-1.0], [0.0], [1.0]]
+>>> dataset = spark.createDataFrame(dataset, ['sample'])
+>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 
0.0, 1.0).collect()[0]
--- End diff --

nit: use first() instead of collect()[0]


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-06 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r179833156
  
--- Diff: python/pyspark/ml/stat.py ---
@@ -134,6 +134,65 @@ def corr(dataset, column, method="pearson"):
 return _java2py(sc, javaCorrObj.corr(*args))
 
 
+class KolmogorovSmirnovTest(object):
+"""
+.. note:: Experimental
+
+Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a continuous
+distribution.
+
+By comparing the largest difference between the empirical cumulative
+distribution of the sample data and the theoretical distribution we 
can provide a test for the
+the null hypothesis that the sample data comes from that theoretical 
distribution.
+
+:param dataset:
+  a dataset or a dataframe containing the sample of data to test.
+:param sampleCol:
+  Name of sample column in dataset, of any numerical type.
+:param distName:
+  a `string` name for a theoretical distribution, currently only 
support "norm".
+:param params:
+  a list of `Double` values specifying the parameters to be used for 
the theoretical
--- End diff --

I realized we should list what the parameters are, both here and in the 
Scala docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-06 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r179824556
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala ---
@@ -102,10 +102,11 @@ object KolmogorovSmirnovTest {
*/
   @Since("2.4.0")
   @varargs
-  def test(dataset: DataFrame, sampleCol: String, distName: String, 
params: Double*): DataFrame = {
+  def test(dataset: Dataset[_], sampleCol: String, distName: String, 
params: Double*)
--- End diff --

nit: This doesn't fit scala style; please get familiar with the style we 
use for multi-line function headers!  Just check out other parts of MLlib for 
examples.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-06 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r179832593
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala ---
@@ -81,7 +81,7 @@ object KolmogorovSmirnovTest {
* Java-friendly version of `test(dataset: DataFrame, sampleCol: String, 
cdf: Double => Double)`
*/
   @Since("2.4.0")
-  def test(dataset: DataFrame, sampleCol: String,
+  def test(dataset: Dataset[_], sampleCol: String,
 cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = {
--- End diff --

I guess I missed this before.  Would you mind fixing the scala style here 
too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-06 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r179830986
  
--- Diff: python/pyspark/ml/stat.py ---
@@ -134,6 +134,65 @@ def corr(dataset, column, method="pearson"):
 return _java2py(sc, javaCorrObj.corr(*args))
 
 
+class KolmogorovSmirnovTest(object):
+"""
+.. note:: Experimental
+
+Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a continuous
+distribution.
+
+By comparing the largest difference between the empirical cumulative
+distribution of the sample data and the theoretical distribution we 
can provide a test for the
+the null hypothesis that the sample data comes from that theoretical 
distribution.
+
+:param dataset:
+  a dataset or a dataframe containing the sample of data to test.
--- End diff --

nit: dataset -> Dataset, dataframe -> DataFrame (It's nice to write class 
names the way they are defined.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-06 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r179832114
  
--- Diff: python/pyspark/ml/stat.py ---
@@ -134,6 +134,65 @@ def corr(dataset, column, method="pearson"):
 return _java2py(sc, javaCorrObj.corr(*args))
 
 
+class KolmogorovSmirnovTest(object):
+"""
+.. note:: Experimental
+
+Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a continuous
+distribution.
+
+By comparing the largest difference between the empirical cumulative
+distribution of the sample data and the theoretical distribution we 
can provide a test for the
+the null hypothesis that the sample data comes from that theoretical 
distribution.
+
+:param dataset:
--- End diff --

I see you're following the example of ChiSquareTest, but this Param 
documentation belongs with the test method, not the class.  Could you please 
shift it?  (Feel free to correct it for ChiSquareTest here or in another PR.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-06 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r179824228
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala ---
@@ -59,7 +59,7 @@ object KolmogorovSmirnovTest {
* distribution of the sample data and the theoretical distribution we 
can provide a test for the
* the null hypothesis that the sample data comes from that theoretical 
distribution.
*
-   * @param dataset a `DataFrame` containing the sample of data to test
+   * @param dataset A dataset or a dataframe containing the sample of data 
to test
--- End diff --

nit: It's nicer to keep single back quotes ``` `DataFrame` ``` to make 
these show up as code in docs for clarity.  No need to get rid of that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-23859][ML] Initial PR for Instrumentation improvements: UUID and logging levels

2018-04-06 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master c926acf71 -> d23a805f9


[SPARK-23859][ML] Initial PR for Instrumentation improvements: UUID and logging 
levels

## What changes were proposed in this pull request?

Initial PR for Instrumentation improvements: UUID and logging levels.
This PR takes over #20837

Closes #20837

## How was this patch tested?

Manual.

Author: Bago Amirbekian 
Author: WeichenXu 

Closes #20982 from WeichenXu123/better-instrumentation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d23a805f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d23a805f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d23a805f

Branch: refs/heads/master
Commit: d23a805f975f209f273db2b52de3f336be17d873
Parents: c926acf
Author: Bago Amirbekian 
Authored: Fri Apr 6 10:09:55 2018 -0700
Committer: Joseph K. Bradley 
Committed: Fri Apr 6 10:09:55 2018 -0700

--
 .../ml/classification/LogisticRegression.scala  | 15 +---
 .../apache/spark/ml/util/Instrumentation.scala  | 40 
 2 files changed, 41 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d23a805f/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 3ae4db3..ee4b010 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -517,6 +517,9 @@ class LogisticRegression @Since("1.2.0") (
 (new MultivariateOnlineSummarizer, new MultiClassSummarizer)
   )(seqOp, combOp, $(aggregationDepth))
 }
+instr.logNamedValue(Instrumentation.loggerTags.numExamples, 
summarizer.count)
+instr.logNamedValue("lowestLabelWeight", 
labelSummarizer.histogram.min.toString)
+instr.logNamedValue("highestLabelWeight", 
labelSummarizer.histogram.max.toString)
 
 val histogram = labelSummarizer.histogram
 val numInvalid = labelSummarizer.countInvalid
@@ -560,15 +563,15 @@ class LogisticRegression @Since("1.2.0") (
   if (numInvalid != 0) {
 val msg = s"Classification labels should be in [0 to ${numClasses - 
1}]. " +
   s"Found $numInvalid invalid labels."
-logError(msg)
+instr.logError(msg)
 throw new SparkException(msg)
   }
 
   val isConstantLabel = histogram.count(_ != 0.0) == 1
 
   if ($(fitIntercept) && isConstantLabel && 
!usingBoundConstrainedOptimization) {
-logWarning(s"All labels are the same value and fitIntercept=true, so 
the coefficients " +
-  s"will be zeros. Training is not needed.")
+instr.logWarning(s"All labels are the same value and 
fitIntercept=true, so the " +
+  s"coefficients will be zeros. Training is not needed.")
 val constantLabelIndex = Vectors.dense(histogram).argmax
 val coefMatrix = new SparseMatrix(numCoefficientSets, numFeatures,
   new Array[Int](numCoefficientSets + 1), Array.empty[Int], 
Array.empty[Double],
@@ -581,7 +584,7 @@ class LogisticRegression @Since("1.2.0") (
 (coefMatrix, interceptVec, Array.empty[Double])
   } else {
 if (!$(fitIntercept) && isConstantLabel) {
-  logWarning(s"All labels belong to a single class and 
fitIntercept=false. It's a " +
+  instr.logWarning(s"All labels belong to a single class and 
fitIntercept=false. It's a " +
 s"dangerous ground, so the algorithm may not converge.")
 }
 
@@ -590,7 +593,7 @@ class LogisticRegression @Since("1.2.0") (
 
 if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
   featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
-  logWarning("Fitting LogisticRegressionModel without intercept on 
dataset with " +
+  instr.logWarning("Fitting LogisticRegressionModel without intercept 
on dataset with " +
 "constant nonzero column, Spark MLlib outputs zero coefficients 
for constant " +
 "nonzero columns. This behavior is the same as R glmnet but 
different from LIBSVM.")
 }
@@ -708,7 +711,7 @@ class LogisticRegression @Since("1.2.0") (
   (_initialModel.interceptVector.size == numCoefficientSets) &&
   (_initialModel.getFitIntercept == $(fitIntercept))
 if (!modelIsValid) {
-  logWarning(s"Initial coefficients will be ignored! Its 
dimensions " +
+  instr.logWarning(s"Initial coefficients will be ignored! Its 
dimensions " +
 s"(${providedCoefs.numRows},

[GitHub] spark issue #20982: [SPARK-23859][ML] Initial PR for Instrumentation improve...

2018-04-06 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20982
  
LGTM
Merging with master
Thanks @WeichenXu123  and @MrBago !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20837: [SPARK-23686][ML][WIP] Better instrumentation

2018-04-06 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20837
  
We can close this issue now that it's been replaced by 
https://github.com/apache/spark/pull/20982


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20994: [SPARK-21898][ML][FOLLOWUP] Fix Scala 2.12 build.

2018-04-06 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20994
  
LGTM (assuming it 2.12 builds now?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-23870][ML] Forward RFormula handleInvalid Param to VectorAssembler to handle invalid values in non-string columns

2018-04-05 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 4807d381b -> f2ac08795


[SPARK-23870][ML] Forward RFormula handleInvalid Param to VectorAssembler to 
handle invalid values in non-string columns

## What changes were proposed in this pull request?

`handleInvalid` Param was forwarded to the VectorAssembler used by RFormula.

## How was this patch tested?

added a test and ran all tests for RFormula and VectorAssembler

Author: Yogesh Garg 

Closes #20970 from yogeshg/spark_23562.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2ac0879
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2ac0879
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2ac0879

Branch: refs/heads/master
Commit: f2ac0879561cde63ed4eb759f5efa0a5ce393a22
Parents: 4807d38
Author: Yogesh Garg 
Authored: Thu Apr 5 19:55:42 2018 -0700
Committer: Joseph K. Bradley 
Committed: Thu Apr 5 19:55:42 2018 -0700

--
 .../org/apache/spark/ml/feature/RFormula.scala  |  1 +
 .../apache/spark/ml/feature/RFormulaSuite.scala | 23 
 2 files changed, 24 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f2ac0879/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
index 22e7b8b..e214765 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
@@ -278,6 +278,7 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override 
val uid: String)
 encoderStages += new VectorAssembler(uid)
   .setInputCols(encodedTerms.toArray)
   .setOutputCol($(featuresCol))
+  .setHandleInvalid($(handleInvalid))
 encoderStages += new VectorAttributeRewriter($(featuresCol), 
prefixesToRewrite.toMap)
 encoderStages += new ColumnPruner(tempColumns.toSet)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f2ac0879/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
index 27d570f..a250331 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.ml.feature
 
+import org.apache.spark.SparkException
 import org.apache.spark.ml.attribute._
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.ParamsSuite
@@ -592,4 +593,26 @@ class RFormulaSuite extends MLTest with 
DefaultReadWriteTest {
 assert(features.toArray === a +: b.toArray)
 }
   }
+
+  test("SPARK-23562 RFormula handleInvalid should handle invalid values in 
non-string columns.") {
+val d1 = Seq(
+  (1001L, "a"),
+  (1002L, "b")).toDF("id1", "c1")
+val d2 = Seq[(java.lang.Long, String)](
+  (20001L, "x"),
+  (20002L, "y"),
+  (null, null)).toDF("id2", "c2")
+val dataset = d1.crossJoin(d2)
+
+def get_output(mode: String): DataFrame = {
+  val formula = new RFormula().setFormula("c1 ~ 
id2").setHandleInvalid(mode)
+  formula.fit(dataset).transform(dataset).select("features", "label")
+}
+
+assert(intercept[SparkException](get_output("error").collect())
+  .getMessage.contains("Encountered null while assembling a row"))
+assert(get_output("skip").count() == 4)
+assert(get_output("keep").count() == 6)
+  }
+
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] spark issue #20970: [SPARK-23870][ML] Forward RFormula handleInvalid Param t...

2018-04-05 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20970
  
LGTM
Merging with master
Thanks @yogeshg !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r178988503
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
+import 
org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params 
with HasMaxIter
+  with HasFeaturesCol with HasPredictionCol with HasWeightCol {
+
+  /**
+   * The number of clusters to create (k). Must be > 1. Default: 2.
+   * @group param
+   */
+  @Since("2.3.0")
+  final val k = new IntParam(this, "k", "The number of clusters to create. 
" +
+"Must be > 1.", ParamValidators.gt(1))
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getK: Int = $(k)
+
+  /**
+   * Param for the initialization algorithm. This can be either "random" 
to use a random vector
+   * as vertex properties, or "degree" to use normalized sum similarities. 
Default: random.
+   */
+  @Since("2.3.0")
+  final val initMode = {
+val allowedParams = ParamValidators.inArray(Array("random", "degree"))
+new Param[String](this, "initMode", "The initialization algorithm. " +
+  "Supported options: 'random' and 'degree'.", allowedParams)
+  }
+
+  /** @group expertGetParam */
+  @Since("2.3.0")
+  def getInitMode: String = $(initMode)
+
+  /**
+   * Param for the column name for ids returned by 
PowerIterationClustering.transform().
+   * Default: "id"
+   * @group param
+   */
+  @Since("2.3.0")
+  val idCol = new Param[String](this, "id", "column name for ids.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getIdCol: String = $(idCol)
+
+  /**
+   * Param for the column name for neighbors required by 
PowerIterationClustering.transform().
+   * Default: "neighbor"
+   * @group param
+   */
+  @Since("2.3.0")
+  val neighborCol = new Param[String](this, "neighbor", "column name for 
neighbors.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getNeighborCol: String = $(neighborCol)
+
+  /**
+   * Validates the input schema
+   * @param schema input schema
+   */
+  protected def validateSchema(schema: StructType): Unit = {
+SchemaUtils.checkColumnType(schema, $(idCol), LongType)
+SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Power Iteration Clustering (PIC), a scalable graph clustering algorithm 
developed by
+ * http://www.icml2010.org/papers/387.pdf>Lin and Cohen. From 
the abstract:
+ * PIC finds a very low-dimensional embedding of a dataset using truncated 
power
+ * iteration on a normalized pair-wise similarity matrix of the data.
+ *
+ * Note that we implement [[PowerIterationClustering]] as a transformer.

[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r178984276
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
+import 
org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params 
with HasMaxIter
+  with HasFeaturesCol with HasPredictionCol with HasWeightCol {
+
+  /**
+   * The number of clusters to create (k). Must be > 1. Default: 2.
+   * @group param
+   */
+  @Since("2.3.0")
+  final val k = new IntParam(this, "k", "The number of clusters to create. 
" +
+"Must be > 1.", ParamValidators.gt(1))
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getK: Int = $(k)
+
+  /**
+   * Param for the initialization algorithm. This can be either "random" 
to use a random vector
+   * as vertex properties, or "degree" to use normalized sum similarities. 
Default: random.
+   */
+  @Since("2.3.0")
+  final val initMode = {
+val allowedParams = ParamValidators.inArray(Array("random", "degree"))
+new Param[String](this, "initMode", "The initialization algorithm. " +
+  "Supported options: 'random' and 'degree'.", allowedParams)
+  }
+
+  /** @group expertGetParam */
+  @Since("2.3.0")
+  def getInitMode: String = $(initMode)
+
+  /**
+   * Param for the column name for ids returned by 
PowerIterationClustering.transform().
+   * Default: "id"
+   * @group param
+   */
+  @Since("2.3.0")
+  val idCol = new Param[String](this, "id", "column name for ids.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getIdCol: String = $(idCol)
+
+  /**
+   * Param for the column name for neighbors required by 
PowerIterationClustering.transform().
+   * Default: "neighbor"
+   * @group param
+   */
+  @Since("2.3.0")
+  val neighborCol = new Param[String](this, "neighbor", "column name for 
neighbors.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getNeighborCol: String = $(neighborCol)
+
+  /**
+   * Validates the input schema
+   * @param schema input schema
+   */
+  protected def validateSchema(schema: StructType): Unit = {
+SchemaUtils.checkColumnType(schema, $(idCol), LongType)
+SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Power Iteration Clustering (PIC), a scalable graph clustering algorithm 
developed by
+ * http://www.icml2010.org/papers/387.pdf>Lin and Cohen. From 
the abstract:
+ * PIC finds a very low-dimensional embedding of a dataset using truncated 
power
+ * iteration on a normalized pair-wise similarity matrix of the data.
+ *
+ * Note that we implement [[PowerIterationClustering]] as a transformer.

[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r178991306
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
+import 
org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params 
with HasMaxIter
+  with HasFeaturesCol with HasPredictionCol with HasWeightCol {
+
+  /**
+   * The number of clusters to create (k). Must be > 1. Default: 2.
+   * @group param
+   */
+  @Since("2.3.0")
+  final val k = new IntParam(this, "k", "The number of clusters to create. 
" +
+"Must be > 1.", ParamValidators.gt(1))
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getK: Int = $(k)
+
+  /**
+   * Param for the initialization algorithm. This can be either "random" 
to use a random vector
+   * as vertex properties, or "degree" to use normalized sum similarities. 
Default: random.
+   */
+  @Since("2.3.0")
+  final val initMode = {
+val allowedParams = ParamValidators.inArray(Array("random", "degree"))
+new Param[String](this, "initMode", "The initialization algorithm. " +
+  "Supported options: 'random' and 'degree'.", allowedParams)
+  }
+
+  /** @group expertGetParam */
+  @Since("2.3.0")
+  def getInitMode: String = $(initMode)
+
+  /**
+   * Param for the column name for ids returned by 
PowerIterationClustering.transform().
+   * Default: "id"
+   * @group param
+   */
+  @Since("2.3.0")
+  val idCol = new Param[String](this, "id", "column name for ids.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getIdCol: String = $(idCol)
+
+  /**
+   * Param for the column name for neighbors required by 
PowerIterationClustering.transform().
+   * Default: "neighbor"
+   * @group param
+   */
+  @Since("2.3.0")
+  val neighborCol = new Param[String](this, "neighbor", "column name for 
neighbors.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getNeighborCol: String = $(neighborCol)
+
+  /**
+   * Validates the input schema
+   * @param schema input schema
+   */
+  protected def validateSchema(schema: StructType): Unit = {
+SchemaUtils.checkColumnType(schema, $(idCol), LongType)
+SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Power Iteration Clustering (PIC), a scalable graph clustering algorithm 
developed by
+ * http://www.icml2010.org/papers/387.pdf>Lin and Cohen. From 
the abstract:
+ * PIC finds a very low-dimensional embedding of a dataset using truncated 
power
+ * iteration on a normalized pair-wise similarity matrix of the data.
+ *
+ * Note that we implement [[PowerIterationClustering]] as a transformer.

[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r178992899
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.util.DefaultReadWriteTest
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+
+class PowerIterationClusteringSuite extends SparkFunSuite
+  with MLlibTestSparkContext with DefaultReadWriteTest {
+
+  @transient var data: Dataset[_] = _
+  @transient var malData: Dataset[_] = _
+  final val r1 = 1.0
+  final val n1 = 10
+  final val r2 = 4.0
+  final val n2 = 40
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+
+data = PowerIterationClusteringSuite.generatePICData(spark, r1, r2, 
n1, n2)
+  }
+
+  test("default parameters") {
+val pic = new PowerIterationClustering()
+
+assert(pic.getK === 2)
+assert(pic.getMaxIter === 20)
+assert(pic.getInitMode === "random")
+assert(pic.getFeaturesCol === "features")
+assert(pic.getPredictionCol === "prediction")
+assert(pic.getIdCol === "id")
+assert(pic.getWeightCol === "weight")
+assert(pic.getNeighborCol === "neighbor")
+  }
+
+  test("set parameters") {
+val pic = new PowerIterationClustering()
+  .setK(9)
+  .setMaxIter(33)
+  .setInitMode("degree")
+  .setFeaturesCol("test_feature")
+  .setPredictionCol("test_prediction")
+  .setIdCol("test_id")
+  .setWeightCol("test_weight")
+  .setNeighborCol("test_neighbor")
+
+assert(pic.getK === 9)
+assert(pic.getMaxIter === 33)
+assert(pic.getInitMode === "degree")
+assert(pic.getFeaturesCol === "test_feature")
+assert(pic.getPredictionCol === "test_prediction")
+assert(pic.getIdCol === "test_id")
+assert(pic.getWeightCol === "test_weight")
+assert(pic.getNeighborCol === "test_neighbor")
+  }
+
+  test("parameters validation") {
+intercept[IllegalArgumentException] {
+  new PowerIterationClustering().setK(1)
+}
+intercept[IllegalArgumentException] {
+  new PowerIterationClustering().setInitMode("no_such_a_mode")
+}
+  }
+
+  test("power iteration clustering") {
+val n = n1 + n2
+
+val model = new PowerIterationClustering()
+  .setK(2)
+  .setMaxIter(40)
+val result = model.transform(data)
+
+val predictions = Array.fill(2)(mutable.Set.empty[Long])
+result.select("id", "prediction").collect().foreach {
+  case Row(id: Long, cluster: Integer) => predictions(cluster) += id
+}
+assert(predictions.toSet == Set((1 until n1).toSet, (n1 until 
n).toSet))
+
+val result2 = new PowerIterationClustering()
+  .setK(2)
+  .setMaxIter(10)
+  .setInitMode("degree")
+  .transform(data)
+val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
+result2.select("id", "prediction").collect().foreach {
+  case Row(id: Long, cluster: Integer) => predictions2(cluster) += id
+}
+assert(predictions2.toSet == Set((1 until n1).toSet, (n1 until 
n).toSet))
+
+val expectedColumns = Array("id", "prediction")
--- End diff --

No need to check this since it's already checks above by result2.select(...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r178988149
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
+import 
org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params 
with HasMaxIter
+  with HasFeaturesCol with HasPredictionCol with HasWeightCol {
--- End diff --

Also, featuresCol is not used, so it should be removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r178991834
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.util.DefaultReadWriteTest
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+
+class PowerIterationClusteringSuite extends SparkFunSuite
+  with MLlibTestSparkContext with DefaultReadWriteTest {
+
+  @transient var data: Dataset[_] = _
+  @transient var malData: Dataset[_] = _
--- End diff --

Not used


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r178987751
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
+import 
org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params 
with HasMaxIter
+  with HasFeaturesCol with HasPredictionCol with HasWeightCol {
+
+  /**
+   * The number of clusters to create (k). Must be > 1. Default: 2.
+   * @group param
+   */
+  @Since("2.3.0")
+  final val k = new IntParam(this, "k", "The number of clusters to create. 
" +
+"Must be > 1.", ParamValidators.gt(1))
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getK: Int = $(k)
+
+  /**
+   * Param for the initialization algorithm. This can be either "random" 
to use a random vector
+   * as vertex properties, or "degree" to use normalized sum similarities. 
Default: random.
+   */
+  @Since("2.3.0")
+  final val initMode = {
+val allowedParams = ParamValidators.inArray(Array("random", "degree"))
+new Param[String](this, "initMode", "The initialization algorithm. " +
+  "Supported options: 'random' and 'degree'.", allowedParams)
+  }
+
+  /** @group expertGetParam */
+  @Since("2.3.0")
+  def getInitMode: String = $(initMode)
+
+  /**
+   * Param for the column name for ids returned by 
PowerIterationClustering.transform().
+   * Default: "id"
+   * @group param
+   */
+  @Since("2.3.0")
+  val idCol = new Param[String](this, "id", "column name for ids.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getIdCol: String = $(idCol)
+
+  /**
+   * Param for the column name for neighbors required by 
PowerIterationClustering.transform().
+   * Default: "neighbor"
+   * @group param
+   */
+  @Since("2.3.0")
+  val neighborCol = new Param[String](this, "neighbor", "column name for 
neighbors.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getNeighborCol: String = $(neighborCol)
+
+  /**
+   * Validates the input schema
--- End diff --

nit: No need for doc like this which is explained by the method title


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r178987675
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.linalg.{Vector}
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
+import 
org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.{col}
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params 
with HasMaxIter
+  with HasFeaturesCol with HasPredictionCol {
+
+  /**
+   * The number of clusters to create (k). Must be > 1. Default: 2.
+   * @group param
+   */
+  @Since("2.2.0")
+  final val k = new IntParam(this, "k", "The number of clusters to create. 
" +
+"Must be > 1.", ParamValidators.gt(1))
+
+  /** @group getParam */
+  @Since("2.2.0")
+  def getK: Int = $(k)
+
+  /**
+   * Param for the initialization algorithm. This can be either "random" 
to use a random vector
+   * as vertex properties, or "degree" to use normalized sum similarities. 
Default: random.
+   */
+  @Since("2.2.0")
+  final val initMode = new Param[String](this, "initMode", "The 
initialization algorithm. " +
+"Supported options: 'random' and 'degree'.",
+(value: String) => validateInitMode(value))
+
+  private[spark] def validateInitMode(initMode: String): Boolean = {
+initMode match {
+  case "random" => true
+  case "degree" => true
+  case _ => false
+}
+  }
+
+  /** @group expertGetParam */
+  @Since("2.2.0")
+  def getInitMode: String = $(initMode)
+
+  /**
+   * Param for the column name for ids returned by 
[[PowerIterationClustering.transform()]].
+   * Default: "id"
+   * @group param
+   */
+  val idCol = new Param[String](this, "idCol", "column name for ids.")
+
+  /** @group getParam */
+  def getIdCol: String = $(idCol)
+
+  /**
+   * Validates the input schema
+   * @param schema input schema
+   */
+  protected def validateSchema(schema: StructType): Unit = {
--- End diff --

+1
Also:
* This should check other input columns to make sure they are defined.
* This should add predictionCol, not check that it exists in the input.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r178987121
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
+import 
org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params 
with HasMaxIter
+  with HasFeaturesCol with HasPredictionCol with HasWeightCol {
--- End diff --

We should not use weightCol, which is for instance weights, not for this 
kind of adjacency.  Let's add a new Param here, perhaps called 
neighborWeightCol.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15770#discussion_r178983843
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
+import 
org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params 
with HasMaxIter
+  with HasFeaturesCol with HasPredictionCol with HasWeightCol {
+
+  /**
+   * The number of clusters to create (k). Must be > 1. Default: 2.
+   * @group param
+   */
+  @Since("2.3.0")
+  final val k = new IntParam(this, "k", "The number of clusters to create. 
" +
+"Must be > 1.", ParamValidators.gt(1))
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getK: Int = $(k)
+
+  /**
+   * Param for the initialization algorithm. This can be either "random" 
to use a random vector
+   * as vertex properties, or "degree" to use normalized sum similarities. 
Default: random.
+   */
+  @Since("2.3.0")
+  final val initMode = {
+val allowedParams = ParamValidators.inArray(Array("random", "degree"))
+new Param[String](this, "initMode", "The initialization algorithm. " +
+  "Supported options: 'random' and 'degree'.", allowedParams)
+  }
+
+  /** @group expertGetParam */
+  @Since("2.3.0")
+  def getInitMode: String = $(initMode)
+
+  /**
+   * Param for the column name for ids returned by 
PowerIterationClustering.transform().
+   * Default: "id"
+   * @group param
+   */
+  @Since("2.3.0")
+  val idCol = new Param[String](this, "id", "column name for ids.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getIdCol: String = $(idCol)
+
+  /**
+   * Param for the column name for neighbors required by 
PowerIterationClustering.transform().
+   * Default: "neighbor"
+   * @group param
+   */
+  @Since("2.3.0")
+  val neighborCol = new Param[String](this, "neighbor", "column name for 
neighbors.")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getNeighborCol: String = $(neighborCol)
+
+  /**
+   * Validates the input schema
+   * @param schema input schema
+   */
+  protected def validateSchema(schema: StructType): Unit = {
+SchemaUtils.checkColumnType(schema, $(idCol), LongType)
+SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Power Iteration Clustering (PIC), a scalable graph clustering algorithm 
developed by
+ * http://www.icml2010.org/papers/387.pdf>Lin and Cohen. From 
the abstract:
+ * PIC finds a very low-dimensional embedding of a dataset using truncated 
power
+ * iteration on a normalized pair-wise similarity matrix of the data.
+ *
+ * Note that we implement [[PowerIterationClustering]] as a transformer.

[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...

2018-04-03 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/15770
  
Just pinged @yanboliang on JIRA about me taking over shepherding this.  It 
will need at least one update: change Since versions from 2.3.0 to 2.4.0.  
Sorry for the long wait @wangmiao1981  : (


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20633#discussion_r178956696
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala ---
@@ -351,27 +359,90 @@ private[ml] object DefaultParamsReader {
   timestamp: Long,
   sparkVersion: String,
   params: JValue,
+  defaultParams: JValue,
   metadata: JValue,
   metadataJson: String) {
 
+
+private def getValueFromParams(params: JValue): Seq[(String, JValue)] 
= {
+  params match {
+case JObject(pairs) => pairs
+case _ =>
+  throw new IllegalArgumentException(
+s"Cannot recognize JSON metadata: $metadataJson.")
+  }
+}
+
 /**
  * Get the JSON value of the [[org.apache.spark.ml.param.Param]] of 
the given name.
  * This can be useful for getting a Param value before an instance of 
`Params`
- * is available.
+ * is available. This will look up `params` first, if not existing 
then looking up
+ * `defaultParams`.
  */
 def getParamValue(paramName: String): JValue = {
   implicit val format = DefaultFormats
-  params match {
+
+  // Looking up for `params` first.
+  var pairs = getValueFromParams(params)
+  var foundPairs = pairs.filter { case (pName, jsonValue) =>
+pName == paramName
+  }
+  if (foundPairs.length == 0) {
+// Looking up for `defaultParams` then.
+pairs = getValueFromParams(defaultParams)
+foundPairs = pairs.filter { case (pName, jsonValue) =>
+  pName == paramName
+}
+  }
+  assert(foundPairs.length == 1, s"Expected one instance of Param 
'$paramName' but found" +
+s" ${foundPairs.length} in JSON Params: " + 
pairs.map(_.toString).mkString(", "))
+
+  foundPairs.map(_._2).head
+}
+
+/**
+ * Extract Params from metadata, and set them in the instance.
+ * This works if all Params (except params included by `skipParams` 
list) implement
+ * [[org.apache.spark.ml.param.Param.jsonDecode()]].
+ *
+ * @param skipParams The params included in `skipParams` won't be set. 
This is useful if some
+ *   params don't implement 
[[org.apache.spark.ml.param.Param.jsonDecode()]]
+ *   and need special handling.
+ */
+def getAndSetParams(
+instance: Params,
+skipParams: Option[List[String]] = None): Unit = {
+  setParams(instance, skipParams, isDefault = false)
+
+  // For metadata file prior to Spark 2.4, there is no default section.
+  val (major, minor) = VersionUtils.majorMinorVersion(sparkVersion)
+  if (major >= 2 && minor >= 4) {
--- End diff --

This should be:
```if (major > 2 || (major == 2 && minor >= 4)) {```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20633#discussion_r178955058
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala ---
@@ -905,6 +905,15 @@ trait Params extends Identifiable with Serializable {
   }
 }
 
+object Params {
--- End diff --

Shall we make this object package private (for cleaner docs)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20633#discussion_r178955105
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala ---
@@ -791,7 +791,7 @@ trait Params extends Identifiable with Serializable {
*   this method gets called.
* @param value  the default value
*/
-  protected final def setDefault[T](param: Param[T], value: T): this.type 
= {
+  private[ml] final def setDefault[T](param: Param[T], value: T): 
this.type = {
--- End diff --

Makes sense; I like your solution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20970: [SPARK-23562][ML] Forward RFormula handleInvalid Param t...

2018-04-03 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20970
  
Side note: In general, when there is a JIRA with subtasks, it's nice to 
send all PRs against subtasks, rather than against the parent task.  This can 
be good when we think of future subtasks which are needed, such as adding the 
python api for VectorAssembler handleInvalid.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20970: [SPARK-23562][ML] Forward RFormula handleInvalid ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20970#discussion_r178938488
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala ---
@@ -592,4 +593,26 @@ class RFormulaSuite extends MLTest with 
DefaultReadWriteTest {
 assert(features.toArray === a +: b.toArray)
 }
   }
+
+  test("SPARK-23562 RFormula handleInvalid should handle invalid values in 
non-string columns.") {
+val d1 = Seq(
+  (1001L, "a"),
+  (1002L, "b")).toDF("id1", "c1")
+val d2 = Seq[(java.lang.Long, String)](
+  (20001L, "x"),
+  (20002L, "y"),
+  (null, null)).toDF("id2", "c2")
+val dataset = d1.crossJoin(d2)
+
+def get_output(mode: String): DataFrame = {
+  val formula = new RFormula().setFormula("c1 ~ 
id2").setHandleInvalid(mode)
+  formula.fit(dataset).transform(dataset).select("features", "label")
+}
+
+intercept[SparkException](get_output("error").collect())
+  .getMessage contains "Encountered null while assembling a row"
--- End diff --

Also, did you mean to assert that contains returns true?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20970: [SPARK-23562][ML] Forward RFormula handleInvalid ...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20970#discussion_r178938415
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala ---
@@ -592,4 +593,26 @@ class RFormulaSuite extends MLTest with 
DefaultReadWriteTest {
 assert(features.toArray === a +: b.toArray)
 }
   }
+
+  test("SPARK-23562 RFormula handleInvalid should handle invalid values in 
non-string columns.") {
+val d1 = Seq(
+  (1001L, "a"),
+  (1002L, "b")).toDF("id1", "c1")
+val d2 = Seq[(java.lang.Long, String)](
+  (20001L, "x"),
+  (20002L, "y"),
+  (null, null)).toDF("id2", "c2")
+val dataset = d1.crossJoin(d2)
+
+def get_output(mode: String): DataFrame = {
+  val formula = new RFormula().setFormula("c1 ~ 
id2").setHandleInvalid(mode)
+  formula.fit(dataset).transform(dataset).select("features", "label")
+}
+
+intercept[SparkException](get_output("error").collect())
+  .getMessage contains "Encountered null while assembling a row"
--- End diff --

We try to avoid using infix notation.  (We tend towards Java-like use of 
Scala for simplicity.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20837: [SPARK-23686][ML][WIP] Better instrumentation

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20837#discussion_r178905201
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 ---
@@ -517,6 +517,9 @@ class LogisticRegression @Since("1.2.0") (
 (new MultivariateOnlineSummarizer, new MultiClassSummarizer)
   )(seqOp, combOp, $(aggregationDepth))
 }
+instr.logNamedValue(Instrumentation.loggerTags.numExamples, 
summarizer.count)
+instr.logNamedValue("lowestLabelWeight", 
labelSummarizer.histogram.min.toString)
+instr.logNamedValue("highestLabelWeight", 
labelSummarizer.histogram.min.toString)
--- End diff --

I'm OK with not logging the full histogram here.  There's a typo, where 
"highestLabelWeight" is actually logging the min (not max)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20786: [SPARK-14681][ML] Provide label/impurity stats for spark...

2018-04-03 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20786
  
Thanks for the updates!  I had a spacing typo in the fromOld() style fix I 
wrote above, and there are some traits which still need to be sealed.  Hope you 
don't mind, but I sent a PR to be explicit here: 
https://github.com/WeichenXu123/spark/pull/5


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20964: [SPARK-22883] ML test for StructuredStreaming: sp...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20964#discussion_r178896682
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala ---
@@ -167,4 +166,20 @@ class MinHashLSHSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
 assert(precision == 1.0)
 assert(recall >= 0.7)
   }
+
+  test("MinHashLSHModel.transform should work with Structured Streaming") {
+val localSpark = spark
+import localSpark.implicits._
+
+val model = new MinHashLSHModel("mh", randCoefficients = Array((1, 0)))
+model.set(model.inputCol, "keys")
+testTransformer[Tuple1[Vector]](dataset.toDF(), model, "keys", 
model.getOutputCol) {
+  case Row(_: Vector, output: Seq[_]) =>
+assert(output.length === model.randCoefficients.length)
+// no AND-amplification yet: SPARK-18450, so each hash output is 
of length 1
+output.foreach {
+  case hashOutput: Vector => assert(hashOutput.size === 1)
+}
+}
--- End diff --

I don't think that's necessary for testing that this works with structured 
streaming.  (I can't see how streaming would mess up the correctness of the 
algorithm.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20964: [SPARK-22883] ML test for StructuredStreaming: sp...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20964#discussion_r178896432
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala ---
@@ -76,6 +75,28 @@ class ImputerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Default
 ImputerSuite.iterateStrategyTest(imputer, df)
   }
 
+  test("Imputer should work with Structured Streaming") {
+val localSpark = spark
+import localSpark.implicits._
+val df = Seq[(java.lang.Double, Double)](
+  (4.0, 4.0),
+  (10.0, 10.0),
+  (10.0, 10.0),
+  (Double.NaN, 8.0),
+  (null, 8.0)
+).toDF("value", "expected_mean_value")
--- End diff --

since it's nullable


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20964: [SPARK-22883] ML test for StructuredStreaming: sp...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20964#discussion_r178894229
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala ---
@@ -48,8 +46,8 @@ class MinMaxScalerSuite extends SparkFunSuite with 
MLlibTestSparkContext with De
   .setMax(5)
 
 val model = scaler.fit(df)
-model.transform(df).select("expected", "scaled").collect()
-  .foreach { case Row(vector1: Vector, vector2: Vector) =>
+testTransformer[(Vector, Vector)](df, model, "expected", "scaled") {
+  case Row(vector1: Vector, vector2: Vector) =>
 assert(vector1.equals(vector2), "Transformed vector is different 
with expected.")
--- End diff --

True, === is more standard


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20964: [SPARK-22883] ML test for StructuredStreaming: sp...

2018-04-03 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20964#discussion_r178893936
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala 
---
@@ -84,7 +84,7 @@ class NGramSuite extends MLTest with DefaultReadWriteTest 
{
 
   def testNGram(t: NGram, dataFrame: DataFrame): Unit = {
 testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", 
"wantedNGrams") {
-  case Row(actualNGrams : Seq[String], wantedNGrams: Seq[String]) =>
+  case Row(actualNGrams : Seq[_], wantedNGrams: Seq[_]) =>
--- End diff --

String is not actually checked because of erasure, so IntelliJ complained 
with a style warning before this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-23690][ML] Add handleinvalid to VectorAssembler

2018-04-02 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 28ea4e314 -> a1351828d


[SPARK-23690][ML] Add handleinvalid to VectorAssembler

## What changes were proposed in this pull request?

Introduce `handleInvalid` parameter in `VectorAssembler` that can take in 
`"keep", "skip", "error"` options. "error" throws an error on seeing a row 
containing a `null`, "skip" filters out all such rows, and "keep" adds relevant 
number of NaN. "keep" figures out an example to find out what this number of 
NaN s should be added and throws an error when no such number could be found.

## How was this patch tested?

Unit tests are added to check the behavior of `assemble` on specific rows and 
the transformer is called on `DataFrame`s of different configurations to test 
different corner cases.

Author: Yogesh Garg 
Author: Bago Amirbekian 
Author: Yogesh Garg <1059168+yoge...@users.noreply.github.com>

Closes #20829 from yogeshg/rformula_handleinvalid.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1351828
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1351828
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1351828

Branch: refs/heads/master
Commit: a1351828d376a01e5ee0959cf608f767d756dd86
Parents: 28ea4e3
Author: Yogesh Garg 
Authored: Mon Apr 2 16:41:26 2018 -0700
Committer: Joseph K. Bradley 
Committed: Mon Apr 2 16:41:26 2018 -0700

--
 .../apache/spark/ml/feature/StringIndexer.scala |   2 +-
 .../spark/ml/feature/VectorAssembler.scala  | 198 +++
 .../spark/ml/feature/VectorAssemblerSuite.scala | 131 ++--
 3 files changed, 284 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a1351828/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index 1cdcdfc..67cdb09 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -234,7 +234,7 @@ class StringIndexerModel (
 val metadata = NominalAttribute.defaultAttr
   .withName($(outputCol)).withValues(filteredLabels).toMetadata()
 // If we are skipping invalid records, filter them out.
-val (filteredDataset, keepInvalid) = getHandleInvalid match {
+val (filteredDataset, keepInvalid) = $(handleInvalid) match {
   case StringIndexer.SKIP_INVALID =>
 val filterer = udf { label: String =>
   labelToIndex.contains(label)

http://git-wip-us.apache.org/repos/asf/spark/blob/a1351828/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
index b373ae9..6bf4aa3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
@@ -17,14 +17,17 @@
 
 package org.apache.spark.ml.feature
 
-import scala.collection.mutable.ArrayBuilder
+import java.util.NoSuchElementException
+
+import scala.collection.mutable
+import scala.language.existentials
 
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.Since
 import org.apache.spark.ml.Transformer
 import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, 
NumericAttribute, UnresolvedAttribute}
 import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
-import org.apache.spark.ml.param.ParamMap
+import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
@@ -33,10 +36,14 @@ import org.apache.spark.sql.types._
 
 /**
  * A feature transformer that merges multiple columns into a vector column.
+ *
+ * This requires one pass over the entire dataset. In case we need to infer 
column lengths from the
+ * data we require an additional call to the 'first' Dataset method, see 
'handleInvalid' parameter.
  */
 @Since("1.4.0")
 class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: 
String)
-  extends Transformer with HasInputCols with HasOutputCol with 
DefaultParamsWritable {
+  extends Transformer with HasInputCols with HasOutputCol with HasHandleInvalid
+with DefaultParamsWritable {
 
   @Since("1.4.0")
   def this() = this(Identifiable.randomUID("vecAssembler"))
@@ -49,32 +56,63 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: 

[GitHub] spark issue #20829: [SPARK-23690][ML] Add handleinvalid to VectorAssembler

2018-04-02 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20829
  
LGTM
Merging with master
Thanks @yogeshg for the PR and @WeichenXu123 for taking a look!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20964: [SPARK-22883] ML test for StructuredStreaming: sp...

2018-04-02 Thread jkbradley
GitHub user jkbradley opened a pull request:

https://github.com/apache/spark/pull/20964

[SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M

## What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:
* IDF
* Imputer
* Interaction
* MaxAbsScaler
* MinHashLSH
* MinMaxScaler
* NGram

## How was this patch tested?

It is a bunch of tests!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jkbradley/spark SPARK-22883-part2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20964.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20964


commit c74cd89c220ea6a679a927c3d3caf1c4f3e24c0b
Author: Joseph K. Bradley 
Date:   2018-03-30T17:44:31Z

partly done

commit 26f7519e31853b2abc8807e32eacd015164dd44d
Author: Joseph K. Bradley 
Date:   2018-04-02T23:34:28Z

remaining tests

commit 778e822ad6ef8e029dc074981ae396feafed2962
Author: Joseph K. Bradley 
Date:   2018-04-02T23:39:10Z

reverted MLTest change




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178676396
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

Oh, I see; you're right.  Does this mean the latest change removes the 
space though?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178620282
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getVectorLengthsFromFirstRow(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), 
missing_columns)
+  case (true, VectorAssembler.KEEP_INVALID) => throw new 
RuntimeException(
+s"""Can not infer column lengths for 'keep invalid' mode. Consider 
using
--- End diff --

ping


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178620200
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
--- End diff --

ping


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178620104
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

ping


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178620142
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

ping


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178619977
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
--- End diff --

Ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178619893
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
--- End diff --

This still isn't correct; look at other MLlib code for examples.  It should 
be:
```
private[feature] def getVectorLengthsFromFirstRow(
dataset: Dataset[_],
columns: Seq[String]): Map[String, Int] = {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-29 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r178202438
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,85 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
-override val prediction: Double,
-override val impurity: Double,
-override private[ml] val impurityStats: ImpurityCalculator) extends 
Node {
+@Since("2.4.0")
+trait ClassificationNode extends Node {
+
+  /**
+   * Get count for specified label in this node
+   * @param label label number in the range [0, numClasses)
+   */
+  @Since("2.4.0")
+  def getLabelCount(label: Int): Double = {
+require(label >= 0 && label < impurityStats.stats.length,
+  "label should be in the rangle between 0 (inclusive) " +
--- End diff --

rangle -> range


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-29 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r178202596
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,85 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
-override val prediction: Double,
-override val impurity: Double,
-override private[ml] val impurityStats: ImpurityCalculator) extends 
Node {
+@Since("2.4.0")
+trait ClassificationNode extends Node {
+
+  /**
+   * Get count for specified label in this node
+   * @param label label number in the range [0, numClasses)
+   */
+  @Since("2.4.0")
+  def getLabelCount(label: Int): Double = {
+require(label >= 0 && label < impurityStats.stats.length,
+  "label should be in the rangle between 0 (inclusive) " +
+  s"and ${impurityStats.stats.length} (exclusive).")
+impurityStats.stats(label)
+  }
+}
+
+@Since("2.4.0")
+trait RegressionNode extends Node {
+
+  /** Number of data points in this node */
--- End diff --

"data points" -> "training data points" (for other methods too)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-29 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r178204351
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,85 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
-override val prediction: Double,
-override val impurity: Double,
-override private[ml] val impurityStats: ImpurityCalculator) extends 
Node {
+@Since("2.4.0")
+trait ClassificationNode extends Node {
+
+  /**
+   * Get count for specified label in this node
+   * @param label label number in the range [0, numClasses)
+   */
+  @Since("2.4.0")
+  def getLabelCount(label: Int): Double = {
+require(label >= 0 && label < impurityStats.stats.length,
+  "label should be in the rangle between 0 (inclusive) " +
+  s"and ${impurityStats.stats.length} (exclusive).")
+impurityStats.stats(label)
+  }
+}
+
+@Since("2.4.0")
+trait RegressionNode extends Node {
+
+  /** Number of data points in this node */
+  @Since("2.4.0")
+  def getCount: Double = impurityStats.stats(0)
+
+  /** Sum of data points labels in this node */
+  @Since("2.4.0")
+  def getSum: Double = impurityStats.stats(1)
+
+  /** Sum of data points label squares in this node */
+  @Since("2.4.0")
+  def getSumOfSquares: Double = impurityStats.stats(2)
+}
+
+@Since("2.4.0")
+trait LeafNode extends Node {
+
+  /** Prediction this node makes. */
+  def prediction: Double
+
+  /** Impurity measure at this node (for training data) */
+  def impurity: Double
--- End diff --

This can be omitted since the docstring matches that in Node.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-29 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r178202260
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,73 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
--- End diff --

For multi-line class and method headers, put the first argument on the next 
line, and put each other argument on its own line.  Just look at other code 
examples in MLlib.  This should be:
```
def fromOld(
  oldNode: OldNode,
  categoricalFeatures: Map[Int, Int],
  isClassification: Boolean): Node = {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-29 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r178202685
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,85 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
-override val prediction: Double,
-override val impurity: Double,
-override private[ml] val impurityStats: ImpurityCalculator) extends 
Node {
+@Since("2.4.0")
+trait ClassificationNode extends Node {
+
+  /**
+   * Get count for specified label in this node
+   * @param label label number in the range [0, numClasses)
+   */
+  @Since("2.4.0")
+  def getLabelCount(label: Int): Double = {
+require(label >= 0 && label < impurityStats.stats.length,
+  "label should be in the rangle between 0 (inclusive) " +
+  s"and ${impurityStats.stats.length} (exclusive).")
+impurityStats.stats(label)
+  }
+}
+
+@Since("2.4.0")
+trait RegressionNode extends Node {
+
+  /** Number of data points in this node */
+  @Since("2.4.0")
+  def getCount: Double = impurityStats.stats(0)
+
+  /** Sum of data points labels in this node */
+  @Since("2.4.0")
+  def getSum: Double = impurityStats.stats(1)
+
+  /** Sum of data points label squares in this node */
--- End diff --

"Sum of data points label squares" -> "Sum over training data points of the 
square of the labels"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-29 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r178204775
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -17,9 +17,11 @@
 
 package org.apache.spark.ml.tree
 
+import org.apache.spark.annotation.Since
 import org.apache.spark.ml.linalg.Vector
 import org.apache.spark.mllib.tree.impurity.ImpurityCalculator
-import org.apache.spark.mllib.tree.model.{ImpurityStats, 
InformationGainStats => OldInformationGainStats, Node => OldNode, Predict => 
OldPredict}
+import org.apache.spark.mllib.tree.model.{ImpurityStats, 
InformationGainStats => OldInformationGainStats,
+  Node => OldNode, Predict => OldPredict}
 
 /**
  * Decision tree node interface.
--- End diff --

We should make Node a trait, not an abstract class.  I didn't realize that 
traits could inherit from abstract classes; weird...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-29 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r178202528
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,85 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
-override val prediction: Double,
-override val impurity: Double,
-override private[ml] val impurityStats: ImpurityCalculator) extends 
Node {
+@Since("2.4.0")
+trait ClassificationNode extends Node {
+
+  /**
+   * Get count for specified label in this node
--- End diff --

"count" -> "count of training examples"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177542325
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
--- End diff --

scala style: For multiline class and method headers, put the first argument 
on the next line, with +4 space indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177543971
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
--- End diff --

nit: In Scala, use camelCase: first_sizes -> firstSizes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177500915
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -17,26 +17,32 @@
 
 package org.apache.spark.ml.feature
 
+import java.util.NoSuchElementException
+
 import scala.collection.mutable.ArrayBuilder
+import scala.language.existentials
 
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.Since
 import org.apache.spark.ml.Transformer
 import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, 
NumericAttribute, UnresolvedAttribute}
 import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
-import org.apache.spark.ml.param.ParamMap
+import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 
 /**
- * A feature transformer that merges multiple columns into a vector column.
+ * A feature transformer that merges multiple columns into a vector 
column. This requires one pass
--- End diff --

style nit: Move new text here into a new paragraph below.  That will give 
nicer "pyramid-style" formatting with essential info separated from details.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177542280
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
--- End diff --

scala style: For multiline class and method headers, put the first argument 
on the next line, with +4 space indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177505970
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+   * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+   * from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+   * Default: "error"
+   * @group param
+   */
+  @Since("2.4.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"""
+| Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+| invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+| output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+| from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+| """.stripMargin.replaceAll("\n", " "),
+ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val vectorCols = $(inputCols).toSeq.filter { c =>
+  schema(c).dataType match {
+case _: VectorUDT => true
+case _ => false
+  }
+}
+val vectorColsLengths = VectorAssembler.getLengths(dataset, 
vectorCols, $(handleInvalid))
+
+val featureAttributesMap = $(inputCols).toSeq.map { c =>
   val field = schema(c)
-  val index = schema.fieldIndex(c)
   field.dataType match {
 case DoubleType =>
-  val attr = Attribute.fromStructField(field)
-  // If the input column doesn't have ML attribute, assume numeric.
-  if (attr == UnresolvedAttribute) {
-Some(NumericAttribute.defaultAttr.withName(c))
-  } else {
-Some(attr.withName(c))
+  val attribute = Attribute.fromStructField(field)
+  attribute match {
+case UnresolvedAttribute =>
+  Seq(NumericAttribute.defaultAttr.withName(c))
+case _ =>
+  Seq(attribute.withName(c))
   }
 case _: NumericType | BooleanType =>
   // If the input column type is a compatible scalar type, assume 
numeric.
-  Some(NumericAttribute.defaultAttr.withName(c))
+  Seq(NumericAttribute.defaultAttr.withName(c))
 case _: VectorUDT =>
-  val group = AttributeGroup.fromStructField(field)
-  if (group.attributes.isDefined) {
-// If attributes are defined, copy them with updated names.
-group.attributes.get.zipWithIndex.map { case (attr, i) =>
+  val attributeGroup = AttributeGroup.fromStructField(field)
--- End diff --

for the future, I'd avoid renaming things like this unless it's really 
unclear or needed (to make diffs shorter)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177503206
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+   * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+   * from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+   * Default: "error"
+   * @group param
+   */
+  @Since("2.4.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"""
+| Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+| invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+| output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+| from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+| """.stripMargin.replaceAll("\n", " "),
+ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val vectorCols = $(inputCols).toSeq.filter { c =>
--- End diff --

nit: Is toSeq extraneous?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177501836
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
--- End diff --

I'd recommend we deal with NaNs now.  This PR is already dealing with some 
NaN cases: Dataset.na.drop handles NaNs in NumericType columns (but not 
VectorUDT columns).

I'm Ok with postponing incorrect vector lengths until later or doing that 
now since that work will be more separate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177547373
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getVectorLengthsFromFirstRow(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), 
missing_columns)
+  case (true, VectorAssembler.KEEP_INVALID) => throw new 
RuntimeException(
+s"""Can not infer column lengths for 'keep invalid' mode. Consider 
using
--- End diff --

nit: Refer to specific Param value: "Cannot infer column lengths for mode 
handleInvalid = "keep""


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177543316
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177543289
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

nit: Put space in between this message and e.toString


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177559339
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +159,88 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls when keepInvalid is true") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors when keepInvalid is false") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Array(1, 1), false)(1.0, null))
+intercept[SparkException](assemble(Array(1, 2), false)(1.0, null))
+intercept[SparkException](assemble(Array(1), false)(null))
+intercept[SparkException](assemble(Array(2), false)(null))
+  }
+
+  test("get lengths functions") {
+import org.apache.spark.ml.feature.VectorAssembler._
+val df = dfWithNulls
+assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2))
+
assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"),
 Seq("y")))
+  .getMessage.contains("VectorSizeHint"))
+
assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1
 > 4"),
+  Seq("y"))).getMessage.contains("VectorSizeHint"))
+
+assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == 
"y" -> 2))
+assert(intercept[NullPointerException](getLengths(df.sort("id2"), 
Seq("y"), ERROR_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+assert(intercept[RuntimeException](getLengths(df.sort("id2"), 
Seq("y"), KEEP_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+  }
+
+  test("Handle Invalid should behave properly") {
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+def run_with_metadata(mode: String, additional_filter: String = 
"true"): Dataset[_] = {
--- End diff --

style: use camelCase


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177559587
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +159,88 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls when keepInvalid is true") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors when keepInvalid is false") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Array(1, 1), false)(1.0, null))
+intercept[SparkException](assemble(Array(1, 2), false)(1.0, null))
+intercept[SparkException](assemble(Array(1), false)(null))
+intercept[SparkException](assemble(Array(2), false)(null))
+  }
+
+  test("get lengths functions") {
+import org.apache.spark.ml.feature.VectorAssembler._
+val df = dfWithNulls
+assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2))
+
assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"),
 Seq("y")))
+  .getMessage.contains("VectorSizeHint"))
+
assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1
 > 4"),
+  Seq("y"))).getMessage.contains("VectorSizeHint"))
+
+assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == 
"y" -> 2))
+assert(intercept[NullPointerException](getLengths(df.sort("id2"), 
Seq("y"), ERROR_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+assert(intercept[RuntimeException](getLengths(df.sort("id2"), 
Seq("y"), KEEP_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+  }
+
+  test("Handle Invalid should behave properly") {
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+def run_with_metadata(mode: String, additional_filter: String = 
"true"): Dataset[_] = {
+  val attributeY = new AttributeGroup("y", 2)
+  val subAttributesOfZ = Array(NumericAttribute.defaultAttr, 
NumericAttribute.defaultAttr)
--- End diff --

unused


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177547735
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getVectorLengthsFromFirstRow(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), 
missing_columns)
+  case (true, VectorAssembler.KEEP_INVALID) => throw new 
RuntimeException(
+s"""Can not infer column lengths for 'keep invalid' mode. Consider 
using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " "))
+  case (_, _) => Map.empty
+}
+group_sizes ++ first_sizes
+  }
+
+
   @Since("1.6.0")
   override def load(path: String): VectorAssembler = super.load(path)
 
-  private[feature] def assemble(vv: Any*): Vector = {
+  /**
+   * Returns a UDF that has the required information to assemble each row.
--- End diff --

nit: When people say "UDF," they generally mean a Spark SQL UDF.  This is 
just a function, not a SQL UDF.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177504904
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+   * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+   * from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+   * Default: "error"
+   * @group param
+   */
+  @Since("2.4.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"""
+| Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+| invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+| output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+| from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+| """.stripMargin.replaceAll("\n", " "),
+ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val vectorCols = $(inputCols).toSeq.filter { c =>
+  schema(c).dataType match {
+case _: VectorUDT => true
+case _ => false
+  }
+}
+val vectorColsLengths = VectorAssembler.getLengths(dataset, 
vectorCols, $(handleInvalid))
+
+val featureAttributesMap = $(inputCols).toSeq.map { c =>
--- End diff --

I think the flatMap is simpler, or at least a more common pattern in Spark 
and Scala (rather than having nested sequences which are then flattened).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177558064
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -18,56 +18,68 @@
 package org.apache.spark.ml.feature
 
 import org.apache.spark.{SparkException, SparkFunSuite}
-import org.apache.spark.ml.attribute.{AttributeGroup, NominalAttribute, 
NumericAttribute}
+import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, 
NominalAttribute, NumericAttribute}
 import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, 
Vectors}
 import org.apache.spark.ml.param.ParamsSuite
 import org.apache.spark.ml.util.DefaultReadWriteTest
 import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.functions.{col, udf}
 
 class VectorAssemblerSuite
   extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
 
   import testImplicits._
 
+  @transient var dfWithNulls: Dataset[_] = _
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+dfWithNulls = Seq[(Long, Long, java.lang.Double, Vector, String, 
Vector, Long, String)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L, null),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L, 
null),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L, null),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L, 
null))
+  .toDF("id1", "id2", "x", "y", "name", "z", "n", "nulls")
+  }
+
   test("params") {
 ParamsSuite.checkParams(new VectorAssembler)
   }
 
   test("assemble") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty))
-assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0)))
+assert(assemble(Array(1), true)(0.0) === Vectors.sparse(1, 
Array.empty, Array.empty))
+assert(assemble(Array(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, 
Array(1), Array(1.0)))
 val dv = Vectors.dense(2.0, 0.0)
-assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), 
Array(2.0, 1.0)))
+assert(assemble(Array(1, 2, 1), true)(0.0, dv, 1.0) ===
+  Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0)))
 val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0))
-assert(assemble(0.0, dv, 1.0, sv) ===
+assert(assemble(Array(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) ===
   Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0)))
-for (v <- Seq(1, "a", null)) {
-  intercept[SparkException](assemble(v))
-  intercept[SparkException](assemble(1.0, v))
+for (v <- Seq(1, "a")) {
+  intercept[SparkException](assemble(Array(1), true)(v))
+  intercept[SparkException](assemble(Array(1, 1), true)(1.0, v))
 }
   }
 
   test("assemble should compress vectors") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0))
+val v1 = assemble(Array(1, 1, 1, 1), true)(0.0, 0.0, 0.0, 
Vectors.dense(4.0))
 assert(v1.isInstanceOf[SparseVector])
-val v2 = assemble(1.0, 2.0, 3.0, Vectors.sparse(1, Array(0), 
Array(4.0)))
+val sv = Vectors.sparse(1, Array(0), Array(4.0))
+val v2 = assemble(Array(1, 1, 1, 1), true)(1.0, 2.0, 3.0, sv)
 assert(v2.isInstanceOf[DenseVector])
   }
 
   test("VectorAssembler") {
-val df = Seq(
-  (0, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), 
Array(3.0)), 10L)
-).toDF("id", "x", "y", "name", "z", "n")
+val df = dfWithNulls.filter("id1 == 1").withColumn("id", col("id1"))
--- End diff --

nit: If this is for consolidation, I'm actually against this little change 
since it obscures what this test is doing and moves the input Row farther from 
the expected output row.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177543627
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
--- End diff --

style: state return value explicitly  (This isn't completely consistent in 
Spark, but we try to in MLlib)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177560225
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +159,88 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls when keepInvalid is true") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors when keepInvalid is false") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Array(1, 1), false)(1.0, null))
+intercept[SparkException](assemble(Array(1, 2), false)(1.0, null))
+intercept[SparkException](assemble(Array(1), false)(null))
+intercept[SparkException](assemble(Array(2), false)(null))
+  }
+
+  test("get lengths functions") {
+import org.apache.spark.ml.feature.VectorAssembler._
+val df = dfWithNulls
+assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2))
+
assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"),
 Seq("y")))
+  .getMessage.contains("VectorSizeHint"))
+
assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1
 > 4"),
+  Seq("y"))).getMessage.contains("VectorSizeHint"))
+
+assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == 
"y" -> 2))
+assert(intercept[NullPointerException](getLengths(df.sort("id2"), 
Seq("y"), ERROR_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+assert(intercept[RuntimeException](getLengths(df.sort("id2"), 
Seq("y"), KEEP_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+  }
+
+  test("Handle Invalid should behave properly") {
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+def run_with_metadata(mode: String, additional_filter: String = 
"true"): Dataset[_] = {
+  val attributeY = new AttributeGroup("y", 2)
+  val subAttributesOfZ = Array(NumericAttribute.defaultAttr, 
NumericAttribute.defaultAttr)
+  val attributeZ = new AttributeGroup(
+"z",
+Array[Attribute](
+  NumericAttribute.defaultAttr.withName("foo"),
+  NumericAttribute.defaultAttr.withName("bar")))
+  val dfWithMetadata = dfWithNulls.withColumn("y", col("y"), 
attributeY.toMetadata())
+.withColumn("z", col("z"), 
attributeZ.toMetadata()).filter(additional_filter)
+  val output = 
assembler.setHandleInvalid(mode).transform(dfWithMetadata)
+  output.collect()
+  output
+}
+def run_with_first_row(mode: String): Dataset[_] = {
--- End diff --

style: Put empty line between functions


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [MINOR] Fix Java lint from new JavaKolmogorovSmirnovTestSuite

2018-03-21 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 2b89e4aa2 -> a091ee676


[MINOR] Fix Java lint from new JavaKolmogorovSmirnovTestSuite

## What changes were proposed in this pull request?

Fix lint-java from https://github.com/apache/spark/pull/19108 addition of 
JavaKolmogorovSmirnovTestSuite

Author: Joseph K. Bradley 

Closes #20875 from jkbradley/kstest-lint-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a091ee67
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a091ee67
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a091ee67

Branch: refs/heads/master
Commit: a091ee676b8707819e94d92693956237310a6145
Parents: 2b89e4a
Author: Joseph K. Bradley 
Authored: Wed Mar 21 13:52:03 2018 -0700
Committer: Joseph K. Bradley 
Committed: Wed Mar 21 13:52:03 2018 -0700

--
 .../apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java  | 7 ---
 1 file changed, 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a091ee67/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java
--
diff --git 
a/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java
 
b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java
index 021272d..830f668 100644
--- 
a/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java
+++ 
b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java
@@ -18,18 +18,11 @@
 package org.apache.spark.ml.stat;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.math3.distribution.NormalDistribution;
-import org.apache.spark.ml.linalg.VectorUDT;
-import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.types.DoubleType;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
 import org.junit.Test;
 
 import org.apache.spark.SharedSparkSession;


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] spark issue #20875: [MINOR] Fix Java lint from new JavaKolmogorovSmirnovTest...

2018-03-21 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20875
  
Merging with master
Thanks for checking @adrian-ionescu !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20875: [MINOR] Fix Java lint from new JavaKolmogorovSmirnovTest...

2018-03-21 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20875
  
CC @WeichenXu123 @MrBago @adrian-ionescu 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >