[GitHub] spark pull request #21042: [SPARK-22883][ML] ML test for StructuredStreaming...
Github user jkbradley closed the pull request at: https://github.com/apache/spark/pull/21042 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-22883][ML] ML test for StructuredStreaming: spark.ml.feature, I-M
Repository: spark Updated Branches: refs/heads/branch-2.3 320269e49 -> acfc156df [SPARK-22883][ML] ML test for StructuredStreaming: spark.ml.feature, I-M This backports https://github.com/apache/spark/pull/20964 to branch-2.3. ## What changes were proposed in this pull request? Adds structured streaming tests using testTransformer for these suites: * IDF * Imputer * Interaction * MaxAbsScaler * MinHashLSH * MinMaxScaler * NGram ## How was this patch tested? It is a bunch of tests! Author: Joseph K. Bradley Author: Joseph K. Bradley Closes #21042 from jkbradley/SPARK-22883-part2-2.3backport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acfc156d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acfc156d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acfc156d Branch: refs/heads/branch-2.3 Commit: acfc156df551632007a47b7ec7a7c901a713082d Parents: 320269e Author: Joseph K. Bradley Authored: Wed Apr 11 11:41:50 2018 -0700 Committer: Joseph K. Bradley Committed: Wed Apr 11 11:41:50 2018 -0700 -- .../org/apache/spark/ml/feature/IDFSuite.scala | 14 +++--- .../apache/spark/ml/feature/ImputerSuite.scala | 31 ++--- .../spark/ml/feature/InteractionSuite.scala | 46 ++-- .../spark/ml/feature/MaxAbsScalerSuite.scala| 14 +++--- .../spark/ml/feature/MinHashLSHSuite.scala | 25 --- .../spark/ml/feature/MinMaxScalerSuite.scala| 14 +++--- .../apache/spark/ml/feature/NGramSuite.scala| 2 +- 7 files changed, 89 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/acfc156d/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index 005edf7..cdd62be 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -17,17 +17,15 @@ package org.apache.spark.ml.feature -import org.apache.spark.SparkFunSuite import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel} import org.apache.spark.mllib.linalg.VectorImplicits._ -import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.Row -class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class IDFSuite extends MLTest with DefaultReadWriteTest { import testImplicits._ @@ -57,7 +55,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead Vectors.dense(0.0, 1.0, 2.0, 3.0), Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) ) -val numOfData = data.size +val numOfData = data.length val idf = Vectors.dense(Array(0, 3, 1, 2).map { x => math.log((numOfData + 1.0) / (x + 1.0)) }) @@ -72,7 +70,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead MLTestingUtils.checkCopyAndUids(idfEst, idfModel) -idfModel.transform(df).select("idfValue", "expected").collect().foreach { +testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") { case Row(x: Vector, y: Vector) => assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.") } @@ -85,7 +83,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead Vectors.dense(0.0, 1.0, 2.0, 3.0), Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) ) -val numOfData = data.size +val numOfData = data.length val idf = Vectors.dense(Array(0, 3, 1, 2).map { x => if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0 }) @@ -99,7 +97,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead .setMinDocFreq(1) .fit(df) -idfModel.transform(df).select("idfValue", "expected").collect().foreach { +testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") { case Row(x: Vector, y: Vector) => assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.") } http://git-wip-us.apache.org/repos/asf/spark/blob/acfc156d/mllib/src/test/scala/org/apache/spark
[GitHub] spark issue #21042: [SPARK-22883][ML] ML test for StructuredStreaming: spark...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/21042 Since this had a LGTM for the original PR and has no changes and tests pass, I'll merge this with branch-2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21042: [SPARK-22883][ML] ML test for StructuredStreaming: spark...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/21042 I haven't made any changes from the PR which was merged into master in https://github.com/apache/spark/pull/20964 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21042: [SPARK-22883] ML test for StructuredStreaming: sp...
GitHub user jkbradley opened a pull request: https://github.com/apache/spark/pull/21042 [SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M This backports https://github.com/apache/spark/pull/20964 to branch-2.3. ## What changes were proposed in this pull request? Adds structured streaming tests using testTransformer for these suites: * IDF * Imputer * Interaction * MaxAbsScaler * MinHashLSH * MinMaxScaler * NGram ## How was this patch tested? It is a bunch of tests! Author: Joseph K. Bradley You can merge this pull request into a Git repository by running: $ git pull https://github.com/jkbradley/spark SPARK-22883-part2-2.3backport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21042.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21042 commit a4209436f177ee59ee39e1c715694f533ecc3d36 Author: Joseph K. Bradley Date: 2018-04-11T16:59:38Z [SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M ## What changes were proposed in this pull request? Adds structured streaming tests using testTransformer for these suites: * IDF * Imputer * Interaction * MaxAbsScaler * MinHashLSH * MinMaxScaler * NGram ## How was this patch tested? It is a bunch of tests! Author: Joseph K. Bradley Closes #20964 from jkbradley/SPARK-22883-part2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20964: [SPARK-22883] ML test for StructuredStreaming: spark.ml....
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20964 Merging with master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M
Repository: spark Updated Branches: refs/heads/master 3cb82047f -> 75a183071 [SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M ## What changes were proposed in this pull request? Adds structured streaming tests using testTransformer for these suites: * IDF * Imputer * Interaction * MaxAbsScaler * MinHashLSH * MinMaxScaler * NGram ## How was this patch tested? It is a bunch of tests! Author: Joseph K. Bradley Closes #20964 from jkbradley/SPARK-22883-part2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75a18307 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75a18307 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75a18307 Branch: refs/heads/master Commit: 75a183071c4ed2e407c930edfdf721779662b3ee Parents: 3cb8204 Author: Joseph K. Bradley Authored: Wed Apr 11 09:59:38 2018 -0700 Committer: Joseph K. Bradley Committed: Wed Apr 11 09:59:38 2018 -0700 -- .../org/apache/spark/ml/feature/IDFSuite.scala | 14 +++--- .../apache/spark/ml/feature/ImputerSuite.scala | 31 ++--- .../spark/ml/feature/InteractionSuite.scala | 46 ++-- .../spark/ml/feature/MaxAbsScalerSuite.scala| 14 +++--- .../spark/ml/feature/MinHashLSHSuite.scala | 25 --- .../spark/ml/feature/MinMaxScalerSuite.scala| 14 +++--- .../apache/spark/ml/feature/NGramSuite.scala| 2 +- 7 files changed, 89 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index 005edf7..cdd62be 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -17,17 +17,15 @@ package org.apache.spark.ml.feature -import org.apache.spark.SparkFunSuite import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel} import org.apache.spark.mllib.linalg.VectorImplicits._ -import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.Row -class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class IDFSuite extends MLTest with DefaultReadWriteTest { import testImplicits._ @@ -57,7 +55,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead Vectors.dense(0.0, 1.0, 2.0, 3.0), Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) ) -val numOfData = data.size +val numOfData = data.length val idf = Vectors.dense(Array(0, 3, 1, 2).map { x => math.log((numOfData + 1.0) / (x + 1.0)) }) @@ -72,7 +70,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead MLTestingUtils.checkCopyAndUids(idfEst, idfModel) -idfModel.transform(df).select("idfValue", "expected").collect().foreach { +testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") { case Row(x: Vector, y: Vector) => assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.") } @@ -85,7 +83,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead Vectors.dense(0.0, 1.0, 2.0, 3.0), Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) ) -val numOfData = data.size +val numOfData = data.length val idf = Vectors.dense(Array(0, 3, 1, 2).map { x => if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0 }) @@ -99,7 +97,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead .setMinDocFreq(1) .fit(df) -idfModel.transform(df).select("idfValue", "expected").collect().foreach { +testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") { case Row(x: Vector, y: Vector) => assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.") } http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala -- diff --git
[GitHub] spark issue #20964: [SPARK-22883] ML test for StructuredStreaming: spark.ml....
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20964 I rebased off of master because of the merge warning in the last tests. I did not have to resolve any conflicts. I'll merge this once tests pass. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-23944][ML] Add the set method for the two LSHModel
Repository: spark Updated Branches: refs/heads/master 4f1e8b9bb -> 7c7570d46 [SPARK-23944][ML] Add the set method for the two LSHModel ## What changes were proposed in this pull request? Add two set method for LSHModel in LSH.scala, BucketedRandomProjectionLSH.scala, and MinHashLSH.scala ## How was this patch tested? New test for the param setup was added into - BucketedRandomProjectionLSHSuite.scala - MinHashLSHSuite.scala Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Lu WANG Closes #21015 from ludatabricks/SPARK-23944. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c7570d4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c7570d4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c7570d4 Branch: refs/heads/master Commit: 7c7570d466a8ded51e580eb6a28583bd9a9c5337 Parents: 4f1e8b9 Author: Lu WANG Authored: Tue Apr 10 17:26:06 2018 -0700 Committer: Joseph K. Bradley Committed: Tue Apr 10 17:26:06 2018 -0700 -- .../spark/ml/feature/BucketedRandomProjectionLSH.scala | 8 mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala | 6 ++ .../main/scala/org/apache/spark/ml/feature/MinHashLSH.scala | 8 .../spark/ml/feature/BucketedRandomProjectionLSHSuite.scala | 8 .../scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala | 8 5 files changed, 38 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7c7570d4/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala index 36a46ca..41eaaf9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala @@ -73,6 +73,14 @@ class BucketedRandomProjectionLSHModel private[ml]( private[ml] val randUnitVectors: Array[Vector]) extends LSHModel[BucketedRandomProjectionLSHModel] with BucketedRandomProjectionLSHParams { + /** @group setParam */ + @Since("2.4.0") + override def setInputCol(value: String): this.type = super.set(inputCol, value) + + /** @group setParam */ + @Since("2.4.0") + override def setOutputCol(value: String): this.type = super.set(outputCol, value) + @Since("2.1.0") override protected[ml] val hashFunction: Vector => Array[Vector] = { key: Vector => { http://git-wip-us.apache.org/repos/asf/spark/blob/7c7570d4/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index 1c9f47a..a70931f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -65,6 +65,12 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] extends Model[T] with LSHParams with MLWritable { self: T => + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + /** * The hash function of LSH, mapping an input feature vector to multiple hash vectors. * @return The mapping of LSH function. http://git-wip-us.apache.org/repos/asf/spark/blob/7c7570d4/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index 145422a..556848e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -51,6 +51,14 @@ class MinHashLSHModel private[ml]( private[ml] val randCoefficients: Array[(Int, Int)]) extends LSHModel[MinHashLSHModel] { + /** @group setParam */ + @Since("2.4.0") + override def setInputCol(value: String): this.type = super.set(inputCol, value) + + /** @group setParam */ + @Since("2.4.0") + override def setOutputCol(value: String): this.type = super.set(outputCol, value) + @Since("2.1.0") override protected[ml] val hashFunction: Vector => Array[Vector] = { elems: Vector => { http://git-wip-us.apache.org/repos/asf/spark/blob/7c7570d4/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala -
[GitHub] spark issue #21015: [SPARK-23944][ML] Add the set method for the two LSHMode...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/21015 LGTM Merging with master Thanks @ludatabricks ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/15770 @wangmiao1981 Do let me know if you're too busy now to resume this; I know it's been a long time. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20964: [SPARK-22883] ML test for StructuredStreaming: spark.ml....
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20964 Thanks! I'll rerun tests since they are stale and merge after they pass. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-23871][ML][PYTHON] add python api for VectorAssembler handleInvalid
Repository: spark Updated Branches: refs/heads/master adb222b95 -> 4f1e8b9bb [SPARK-23871][ML][PYTHON] add python api for VectorAssembler handleInvalid ## What changes were proposed in this pull request? add python api for VectorAssembler handleInvalid ## How was this patch tested? Add doctest Author: Huaxin Gao Closes #21003 from huaxingao/spark-23871. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f1e8b9b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f1e8b9b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f1e8b9b Branch: refs/heads/master Commit: 4f1e8b9bb7d795d4ca3d5cd5dcc0f9419e52dfae Parents: adb222b Author: Huaxin Gao Authored: Tue Apr 10 15:41:45 2018 -0700 Committer: Joseph K. Bradley Committed: Tue Apr 10 15:41:45 2018 -0700 -- .../spark/ml/feature/VectorAssembler.scala | 12 +++--- python/pyspark/ml/feature.py| 42 +--- 2 files changed, 43 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f1e8b9b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala index 6bf4aa3..4061154 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala @@ -71,12 +71,12 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) */ @Since("2.4.0") override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", -"""Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with - |invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the - |output). Column lengths are taken from the size of ML Attribute Group, which can be set using - |`VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred - |from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. - |""".stripMargin.replaceAll("\n", " "), +"""Param for how to handle invalid data (NULL and NaN values). Options are 'skip' (filter out + |rows with invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN + |in the output). Column lengths are taken from the size of ML Attribute Group, which can be + |set using `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also + |be inferred from first rows of the data since it is safe to do so but only in case of 'error' + |or 'skip'.""".stripMargin.replaceAll("\n", " "), ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) http://git-wip-us.apache.org/repos/asf/spark/blob/4f1e8b9b/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 5a3e0dd..cdda30c 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -2701,7 +2701,8 @@ class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, Java @inherit_doc -class VectorAssembler(JavaTransformer, HasInputCols, HasOutputCol, JavaMLReadable, JavaMLWritable): +class VectorAssembler(JavaTransformer, HasInputCols, HasOutputCol, HasHandleInvalid, JavaMLReadable, + JavaMLWritable): """ A feature transformer that merges multiple columns into a vector column. @@ -2719,25 +2720,56 @@ class VectorAssembler(JavaTransformer, HasInputCols, HasOutputCol, JavaMLReadabl >>> loadedAssembler = VectorAssembler.load(vectorAssemblerPath) >>> loadedAssembler.transform(df).head().freqs == vecAssembler.transform(df).head().freqs True +>>> dfWithNullsAndNaNs = spark.createDataFrame( +...[(1.0, 2.0, None), (3.0, float("nan"), 4.0), (5.0, 6.0, 7.0)], ["a", "b", "c"]) +>>> vecAssembler2 = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features", +...handleInvalid="keep") +>>> vecAssembler2.transform(dfWithNullsAndNaNs).show() ++---+---++-+ +| a| b| c| features| ++---+---++-+ +|1.0|2.0|null|[1.0,2.0,NaN]| +|3.0|NaN| 4.0|[3.0,NaN,4.0]| +|5.0|6.0| 7.0|[5.0,6.0,7.0]| ++---+---++-+ +... +>>> vecAssembler2.setParams(handleInvalid="skip").transform(dfWithNullsAndNaNs).show() ++---+---+---+-+ +| a| b| c| feature
[GitHub] spark issue #21003: [SPARK-23871][ML][PYTHON]add python api for VectorAssemb...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/21003 Merging with master Thanks @huaxingao ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21003: [SPARK-23871][ML][PYTHON]add python api for VectorAssemb...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/21003 LGTM But would you mind making one fix in the doc here and in Scala? "Param for how to handle invalid data (NULL values)" should actually read "Param for how to handle invalid data (NULL **and NaN** values)" Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21030: typo rawPredicition changed to rawPrediction
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/21030 LGTM pending tests Thanks for finding & fixing this! Would you mind creating a JIRA and linking it to https://issues.apache.org/jira/browse/SPARK-21856 ? I'd like a tracking JIRA since I'd like to backport it to branch-2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21030: typo rawPredicition changed to rawPrediction
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/21030 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-23751][ML][PYSPARK] Kolmogorov-Smirnoff test Python API in pyspark.ml
Repository: spark Updated Branches: refs/heads/master e17965891 -> adb222b95 [SPARK-23751][ML][PYSPARK] Kolmogorov-Smirnoff test Python API in pyspark.ml ## What changes were proposed in this pull request? Kolmogorov-Smirnoff test Python API in `pyspark.ml` **Note** API with `CDF` is a little difficult to support in python. We can add it in following PR. ## How was this patch tested? doctest Author: WeichenXu Closes #20904 from WeichenXu123/ks-test-py. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/adb222b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/adb222b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/adb222b9 Branch: refs/heads/master Commit: adb222b957f327a69929b8f16fa5ebc071fa99e3 Parents: e179658 Author: WeichenXu Authored: Tue Apr 10 11:18:14 2018 -0700 Committer: Joseph K. Bradley Committed: Tue Apr 10 11:18:14 2018 -0700 -- .../spark/ml/stat/KolmogorovSmirnovTest.scala | 29 +-- python/pyspark/ml/stat.py | 181 +-- 2 files changed, 138 insertions(+), 72 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/adb222b9/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala index c62d746..af8ff64 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala @@ -24,7 +24,7 @@ import org.apache.spark.api.java.function.Function import org.apache.spark.ml.util.SchemaUtils import org.apache.spark.mllib.stat.{Statistics => OldStatistics} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.col /** @@ -59,7 +59,7 @@ object KolmogorovSmirnovTest { * distribution of the sample data and the theoretical distribution we can provide a test for the * the null hypothesis that the sample data comes from that theoretical distribution. * - * @param dataset a `DataFrame` containing the sample of data to test + * @param dataset A `Dataset` or a `DataFrame` containing the sample of data to test * @param sampleCol Name of sample column in dataset, of any numerical type * @param cdf a `Double => Double` function to calculate the theoretical CDF at a given value * @return DataFrame containing the test result for the input sampled data. @@ -68,10 +68,10 @@ object KolmogorovSmirnovTest { * - `statistic: Double` */ @Since("2.4.0") - def test(dataset: DataFrame, sampleCol: String, cdf: Double => Double): DataFrame = { + def test(dataset: Dataset[_], sampleCol: String, cdf: Double => Double): DataFrame = { val spark = dataset.sparkSession -val rdd = getSampleRDD(dataset, sampleCol) +val rdd = getSampleRDD(dataset.toDF(), sampleCol) val testResult = OldStatistics.kolmogorovSmirnovTest(rdd, cdf) spark.createDataFrame(Seq(KolmogorovSmirnovTestResult( testResult.pValue, testResult.statistic))) @@ -81,10 +81,11 @@ object KolmogorovSmirnovTest { * Java-friendly version of `test(dataset: DataFrame, sampleCol: String, cdf: Double => Double)` */ @Since("2.4.0") - def test(dataset: DataFrame, sampleCol: String, -cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = { -val f: Double => Double = x => cdf.call(x) -test(dataset, sampleCol, f) + def test( + dataset: Dataset[_], + sampleCol: String, + cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = { +test(dataset, sampleCol, (x: Double) => cdf.call(x)) } /** @@ -92,10 +93,11 @@ object KolmogorovSmirnovTest { * distribution equality. Currently supports the normal distribution, taking as parameters * the mean and standard deviation. * - * @param dataset a `DataFrame` containing the sample of data to test + * @param dataset A `Dataset` or a `DataFrame` containing the sample of data to test * @param sampleCol Name of sample column in dataset, of any numerical type * @param distName a `String` name for a theoretical distribution, currently only support "norm". - * @param params `Double*` specifying the parameters to be used for the theoretical distribution + * @param params `Double*` specifying the parameters to be used for the theoretical distribution. +* For "norm" distribution, the parameters includes mean and variance. * @return DataFrame containing the test result for the input sampled data. *
[GitHub] spark issue #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff test Pyth...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20904 LGTM Thanks for the PR! Merging with master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21015: [SPARK-23944][ML] Add the set method for the two LSHMode...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/21015 add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r180228711 --- Diff: python/pyspark/ml/stat.py --- @@ -127,13 +113,86 @@ class Correlation(object): def corr(dataset, column, method="pearson"): """ Compute the correlation matrix with specified method using dataset. + +:param dataset: + A Dataset or a DataFrame. +:param column: + The name of the column of vectors for which the correlation coefficient needs + to be computed. This must be a column of the dataset, and it must contain + Vector objects. +:param method: + String specifying the method to use for computing correlation. + Supported: `pearson` (default), `spearman`. +:return: + A DataFrame that contains the correlation matrix of the column of vectors. This + DataFrame contains a single row and a single column of name + '$METHODNAME($COLUMN)'. """ sc = SparkContext._active_spark_context javaCorrObj = _jvm().org.apache.spark.ml.stat.Correlation args = [_py2java(sc, arg) for arg in (dataset, column, method)] return _java2py(sc, javaCorrObj.corr(*args)) +class KolmogorovSmirnovTest(object): +""" +.. note:: Experimental + +Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous +distribution. + +By comparing the largest difference between the empirical cumulative +distribution of the sample data and the theoretical distribution we can provide a test for the +the null hypothesis that the sample data comes from that theoretical distribution. + +>>> from pyspark.ml.stat import KolmogorovSmirnovTest --- End diff -- Thanks for moving the method-specific documentation. These doctests are method-specific too, though, so can you please move them as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r180245120 --- Diff: python/pyspark/ml/stat.py --- @@ -127,13 +113,86 @@ class Correlation(object): def corr(dataset, column, method="pearson"): """ Compute the correlation matrix with specified method using dataset. + +:param dataset: + A Dataset or a DataFrame. +:param column: + The name of the column of vectors for which the correlation coefficient needs + to be computed. This must be a column of the dataset, and it must contain + Vector objects. +:param method: + String specifying the method to use for computing correlation. + Supported: `pearson` (default), `spearman`. +:return: + A DataFrame that contains the correlation matrix of the column of vectors. This + DataFrame contains a single row and a single column of name + '$METHODNAME($COLUMN)'. """ sc = SparkContext._active_spark_context javaCorrObj = _jvm().org.apache.spark.ml.stat.Correlation args = [_py2java(sc, arg) for arg in (dataset, column, method)] return _java2py(sc, javaCorrObj.corr(*args)) +class KolmogorovSmirnovTest(object): +""" +.. note:: Experimental + +Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous +distribution. + +By comparing the largest difference between the empirical cumulative +distribution of the sample data and the theoretical distribution we can provide a test for the +the null hypothesis that the sample data comes from that theoretical distribution. + +>>> from pyspark.ml.stat import KolmogorovSmirnovTest +>>> dataset = [[-1.0], [0.0], [1.0]] +>>> dataset = spark.createDataFrame(dataset, ['sample']) +>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 0.0, 1.0).first() +>>> round(ksResult.pValue, 3) +1.0 +>>> round(ksResult.statistic, 3) +0.175 +>>> dataset = [[2.0], [3.0], [4.0]] +>>> dataset = spark.createDataFrame(dataset, ['sample']) +>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 3.0, 1.0).first() +>>> round(ksResult.pValue, 3) +1.0 +>>> round(ksResult.statistic, 3) +0.175 + +.. versionadded:: 2.4.0 + +""" +@staticmethod +@since("2.4.0") +def test(dataset, sampleCol, distName, *params): +""" +Perform a Kolmogorov-Smirnov test using dataset. --- End diff -- Can you please make this match the text in the Scala doc? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-14681][ML] Provide label/impurity stats for spark.ml decision tree nodes
Repository: spark Updated Branches: refs/heads/master 7c1654e21 -> 252468a74 [SPARK-14681][ML] Provide label/impurity stats for spark.ml decision tree nodes ## What changes were proposed in this pull request? API: ``` trait ClassificationNode extends Node def getLabelCount(label: Int): Double trait RegressionNode extends Node def getCount(): Double def getSum(): Double def getSquareSum(): Double // turn LeafNode to be trait trait LeafNode extends Node { def prediction: Double def impurity: Double ... } class ClassificationLeafNode extends ClassificationNode with LeafNode class RegressionLeafNode extends RegressionNode with LeafNode // turn InternalNode to be trait trait InternalNode extends Node{ def gain: Double def leftChild: Node def rightChild: Node def split: Split ... } class ClassificationInternalNode extends ClassificationNode with InternalNode override def leftChild: ClassificationNode override def rightChild: ClassificationNode class RegressionInternalNode extends RegressionNode with InternalNode override val leftChild: RegressionNode override val rightChild: RegressionNode class DecisionTreeClassificationModel override val rootNode: ClassificationNode class DecisionTreeRegressionModel override val rootNode: RegressionNode ``` Closes #17466 ## How was this patch tested? UT will be added soon. Author: WeichenXu Author: jkbradley Closes #20786 from WeichenXu123/tree_stat_api_2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/252468a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/252468a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/252468a7 Branch: refs/heads/master Commit: 252468a744b95082400ba9e8b2e3b3d9d50ab7fa Parents: 7c1654e Author: WeichenXu Authored: Mon Apr 9 12:18:07 2018 -0700 Committer: Joseph K. Bradley Committed: Mon Apr 9 12:18:07 2018 -0700 -- .../classification/DecisionTreeClassifier.scala | 14 +- .../spark/ml/classification/GBTClassifier.scala | 6 +- .../classification/RandomForestClassifier.scala | 6 +- .../ml/regression/DecisionTreeRegressor.scala | 13 +- .../spark/ml/regression/GBTRegressor.scala | 6 +- .../ml/regression/RandomForestRegressor.scala | 6 +- .../scala/org/apache/spark/ml/tree/Node.scala | 247 +++ .../spark/ml/tree/impl/RandomForest.scala | 10 +- .../org/apache/spark/ml/tree/treeModels.scala | 36 ++- .../DecisionTreeClassifierSuite.scala | 31 ++- .../ml/classification/GBTClassifierSuite.scala | 4 +- .../RandomForestClassifierSuite.scala | 5 +- .../regression/DecisionTreeRegressorSuite.scala | 14 ++ .../spark/ml/tree/impl/RandomForestSuite.scala | 22 +- .../apache/spark/ml/tree/impl/TreeTests.scala | 12 +- project/MimaExcludes.scala | 9 +- 16 files changed, 333 insertions(+), 108 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/252468a7/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index 65cce69..771cd4f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -165,7 +165,7 @@ object DecisionTreeClassifier extends DefaultParamsReadable[DecisionTreeClassifi @Since("1.4.0") class DecisionTreeClassificationModel private[ml] ( @Since("1.4.0")override val uid: String, -@Since("1.4.0")override val rootNode: Node, +@Since("1.4.0")override val rootNode: ClassificationNode, @Since("1.6.0")override val numFeatures: Int, @Since("1.5.0")override val numClasses: Int) extends ProbabilisticClassificationModel[Vector, DecisionTreeClassificationModel] @@ -178,7 +178,7 @@ class DecisionTreeClassificationModel private[ml] ( * Construct a decision tree classification model. * @param rootNode Root node of tree, with other nodes attached. */ - private[ml] def this(rootNode: Node, numFeatures: Int, numClasses: Int) = + private[ml] def this(rootNode: ClassificationNode, numFeatures: Int, numClasses: Int) = this(Identifiable.randomUID("dtc"), rootNode, numFeatures, numClasses) override def predict(features: Vector): Double = { @@ -276,8 +276,9 @@ object DecisionTreeClassificationModel extends MLReadable[DecisionTreeClassifica val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val
[GitHub] spark issue #20786: [SPARK-14681][ML] Provide label/impurity stats for spark...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20786 LGTM Merging with master Thanks @WeichenXu123 ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18227: [SPARK-21005][ML] Fix VectorIndexerModel does not prepar...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/18227 Just commented on the JIRA about this issue: https://issues.apache.org/jira/browse/SPARK-21005 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20825: add impurity stats in tree leaf node debug string
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20825 I actually would prefer not to merge this change since it could blow up the size of the strings printed for some classification tasks with large numbers of labels. If people want to debug, they could trace through the tree manually. Alternatively, I'd be OK with adding an optional argument which tells toDebugString to include the stats. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20319: [SPARK-22884][ML][TESTS] ML test for StructuredStreaming...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20319 @smurakozi Thanks for the PR! I have bandwidth to review this now. Do you have time to rebase this to fix the merge conflicts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r179831482 --- Diff: python/pyspark/ml/stat.py --- @@ -134,6 +134,65 @@ def corr(dataset, column, method="pearson"): return _java2py(sc, javaCorrObj.corr(*args)) +class KolmogorovSmirnovTest(object): +""" +.. note:: Experimental + +Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous +distribution. + +By comparing the largest difference between the empirical cumulative +distribution of the sample data and the theoretical distribution we can provide a test for the +the null hypothesis that the sample data comes from that theoretical distribution. + +:param dataset: + a dataset or a dataframe containing the sample of data to test. +:param sampleCol: + Name of sample column in dataset, of any numerical type. +:param distName: + a `string` name for a theoretical distribution, currently only support "norm". +:param params: + a list of `Double` values specifying the parameters to be used for the theoretical + distribution +:return: + A dataframe that contains the Kolmogorov-Smirnov test result for the input sampled data. + This DataFrame will contain a single Row with the following fields: + - `pValue: Double` + - `statistic: Double` + +>>> from pyspark.ml.stat import KolmogorovSmirnovTest +>>> dataset = [[-1.0], [0.0], [1.0]] +>>> dataset = spark.createDataFrame(dataset, ['sample']) +>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 0.0, 1.0).collect()[0] --- End diff -- nit: use first() instead of collect()[0] --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r179833156 --- Diff: python/pyspark/ml/stat.py --- @@ -134,6 +134,65 @@ def corr(dataset, column, method="pearson"): return _java2py(sc, javaCorrObj.corr(*args)) +class KolmogorovSmirnovTest(object): +""" +.. note:: Experimental + +Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous +distribution. + +By comparing the largest difference between the empirical cumulative +distribution of the sample data and the theoretical distribution we can provide a test for the +the null hypothesis that the sample data comes from that theoretical distribution. + +:param dataset: + a dataset or a dataframe containing the sample of data to test. +:param sampleCol: + Name of sample column in dataset, of any numerical type. +:param distName: + a `string` name for a theoretical distribution, currently only support "norm". +:param params: + a list of `Double` values specifying the parameters to be used for the theoretical --- End diff -- I realized we should list what the parameters are, both here and in the Scala docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r179824556 --- Diff: mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala --- @@ -102,10 +102,11 @@ object KolmogorovSmirnovTest { */ @Since("2.4.0") @varargs - def test(dataset: DataFrame, sampleCol: String, distName: String, params: Double*): DataFrame = { + def test(dataset: Dataset[_], sampleCol: String, distName: String, params: Double*) --- End diff -- nit: This doesn't fit scala style; please get familiar with the style we use for multi-line function headers! Just check out other parts of MLlib for examples. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r179832593 --- Diff: mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala --- @@ -81,7 +81,7 @@ object KolmogorovSmirnovTest { * Java-friendly version of `test(dataset: DataFrame, sampleCol: String, cdf: Double => Double)` */ @Since("2.4.0") - def test(dataset: DataFrame, sampleCol: String, + def test(dataset: Dataset[_], sampleCol: String, cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = { --- End diff -- I guess I missed this before. Would you mind fixing the scala style here too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r179830986 --- Diff: python/pyspark/ml/stat.py --- @@ -134,6 +134,65 @@ def corr(dataset, column, method="pearson"): return _java2py(sc, javaCorrObj.corr(*args)) +class KolmogorovSmirnovTest(object): +""" +.. note:: Experimental + +Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous +distribution. + +By comparing the largest difference between the empirical cumulative +distribution of the sample data and the theoretical distribution we can provide a test for the +the null hypothesis that the sample data comes from that theoretical distribution. + +:param dataset: + a dataset or a dataframe containing the sample of data to test. --- End diff -- nit: dataset -> Dataset, dataframe -> DataFrame (It's nice to write class names the way they are defined.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r179832114 --- Diff: python/pyspark/ml/stat.py --- @@ -134,6 +134,65 @@ def corr(dataset, column, method="pearson"): return _java2py(sc, javaCorrObj.corr(*args)) +class KolmogorovSmirnovTest(object): +""" +.. note:: Experimental + +Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous +distribution. + +By comparing the largest difference between the empirical cumulative +distribution of the sample data and the theoretical distribution we can provide a test for the +the null hypothesis that the sample data comes from that theoretical distribution. + +:param dataset: --- End diff -- I see you're following the example of ChiSquareTest, but this Param documentation belongs with the test method, not the class. Could you please shift it? (Feel free to correct it for ChiSquareTest here or in another PR.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r179824228 --- Diff: mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala --- @@ -59,7 +59,7 @@ object KolmogorovSmirnovTest { * distribution of the sample data and the theoretical distribution we can provide a test for the * the null hypothesis that the sample data comes from that theoretical distribution. * - * @param dataset a `DataFrame` containing the sample of data to test + * @param dataset A dataset or a dataframe containing the sample of data to test --- End diff -- nit: It's nicer to keep single back quotes ``` `DataFrame` ``` to make these show up as code in docs for clarity. No need to get rid of that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-23859][ML] Initial PR for Instrumentation improvements: UUID and logging levels
Repository: spark Updated Branches: refs/heads/master c926acf71 -> d23a805f9 [SPARK-23859][ML] Initial PR for Instrumentation improvements: UUID and logging levels ## What changes were proposed in this pull request? Initial PR for Instrumentation improvements: UUID and logging levels. This PR takes over #20837 Closes #20837 ## How was this patch tested? Manual. Author: Bago Amirbekian Author: WeichenXu Closes #20982 from WeichenXu123/better-instrumentation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d23a805f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d23a805f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d23a805f Branch: refs/heads/master Commit: d23a805f975f209f273db2b52de3f336be17d873 Parents: c926acf Author: Bago Amirbekian Authored: Fri Apr 6 10:09:55 2018 -0700 Committer: Joseph K. Bradley Committed: Fri Apr 6 10:09:55 2018 -0700 -- .../ml/classification/LogisticRegression.scala | 15 +--- .../apache/spark/ml/util/Instrumentation.scala | 40 2 files changed, 41 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d23a805f/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 3ae4db3..ee4b010 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -517,6 +517,9 @@ class LogisticRegression @Since("1.2.0") ( (new MultivariateOnlineSummarizer, new MultiClassSummarizer) )(seqOp, combOp, $(aggregationDepth)) } +instr.logNamedValue(Instrumentation.loggerTags.numExamples, summarizer.count) +instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) +instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) val histogram = labelSummarizer.histogram val numInvalid = labelSummarizer.countInvalid @@ -560,15 +563,15 @@ class LogisticRegression @Since("1.2.0") ( if (numInvalid != 0) { val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + s"Found $numInvalid invalid labels." -logError(msg) +instr.logError(msg) throw new SparkException(msg) } val isConstantLabel = histogram.count(_ != 0.0) == 1 if ($(fitIntercept) && isConstantLabel && !usingBoundConstrainedOptimization) { -logWarning(s"All labels are the same value and fitIntercept=true, so the coefficients " + - s"will be zeros. Training is not needed.") +instr.logWarning(s"All labels are the same value and fitIntercept=true, so the " + + s"coefficients will be zeros. Training is not needed.") val constantLabelIndex = Vectors.dense(histogram).argmax val coefMatrix = new SparseMatrix(numCoefficientSets, numFeatures, new Array[Int](numCoefficientSets + 1), Array.empty[Int], Array.empty[Double], @@ -581,7 +584,7 @@ class LogisticRegression @Since("1.2.0") ( (coefMatrix, interceptVec, Array.empty[Double]) } else { if (!$(fitIntercept) && isConstantLabel) { - logWarning(s"All labels belong to a single class and fitIntercept=false. It's a " + + instr.logWarning(s"All labels belong to a single class and fitIntercept=false. It's a " + s"dangerous ground, so the algorithm may not converge.") } @@ -590,7 +593,7 @@ class LogisticRegression @Since("1.2.0") ( if (!$(fitIntercept) && (0 until numFeatures).exists { i => featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) { - logWarning("Fitting LogisticRegressionModel without intercept on dataset with " + + instr.logWarning("Fitting LogisticRegressionModel without intercept on dataset with " + "constant nonzero column, Spark MLlib outputs zero coefficients for constant " + "nonzero columns. This behavior is the same as R glmnet but different from LIBSVM.") } @@ -708,7 +711,7 @@ class LogisticRegression @Since("1.2.0") ( (_initialModel.interceptVector.size == numCoefficientSets) && (_initialModel.getFitIntercept == $(fitIntercept)) if (!modelIsValid) { - logWarning(s"Initial coefficients will be ignored! Its dimensions " + + instr.logWarning(s"Initial coefficients will be ignored! Its dimensions " + s"(${providedCoefs.numRows},
[GitHub] spark issue #20982: [SPARK-23859][ML] Initial PR for Instrumentation improve...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20982 LGTM Merging with master Thanks @WeichenXu123 and @MrBago ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20837: [SPARK-23686][ML][WIP] Better instrumentation
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20837 We can close this issue now that it's been replaced by https://github.com/apache/spark/pull/20982 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20994: [SPARK-21898][ML][FOLLOWUP] Fix Scala 2.12 build.
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20994 LGTM (assuming it 2.12 builds now?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-23870][ML] Forward RFormula handleInvalid Param to VectorAssembler to handle invalid values in non-string columns
Repository: spark Updated Branches: refs/heads/master 4807d381b -> f2ac08795 [SPARK-23870][ML] Forward RFormula handleInvalid Param to VectorAssembler to handle invalid values in non-string columns ## What changes were proposed in this pull request? `handleInvalid` Param was forwarded to the VectorAssembler used by RFormula. ## How was this patch tested? added a test and ran all tests for RFormula and VectorAssembler Author: Yogesh Garg Closes #20970 from yogeshg/spark_23562. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2ac0879 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2ac0879 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2ac0879 Branch: refs/heads/master Commit: f2ac0879561cde63ed4eb759f5efa0a5ce393a22 Parents: 4807d38 Author: Yogesh Garg Authored: Thu Apr 5 19:55:42 2018 -0700 Committer: Joseph K. Bradley Committed: Thu Apr 5 19:55:42 2018 -0700 -- .../org/apache/spark/ml/feature/RFormula.scala | 1 + .../apache/spark/ml/feature/RFormulaSuite.scala | 23 2 files changed, 24 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f2ac0879/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index 22e7b8b..e214765 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -278,6 +278,7 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) encoderStages += new VectorAssembler(uid) .setInputCols(encodedTerms.toArray) .setOutputCol($(featuresCol)) + .setHandleInvalid($(handleInvalid)) encoderStages += new VectorAttributeRewriter($(featuresCol), prefixesToRewrite.toMap) encoderStages += new ColumnPruner(tempColumns.toSet) http://git-wip-us.apache.org/repos/asf/spark/blob/f2ac0879/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala index 27d570f..a250331 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.ml.feature +import org.apache.spark.SparkException import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite @@ -592,4 +593,26 @@ class RFormulaSuite extends MLTest with DefaultReadWriteTest { assert(features.toArray === a +: b.toArray) } } + + test("SPARK-23562 RFormula handleInvalid should handle invalid values in non-string columns.") { +val d1 = Seq( + (1001L, "a"), + (1002L, "b")).toDF("id1", "c1") +val d2 = Seq[(java.lang.Long, String)]( + (20001L, "x"), + (20002L, "y"), + (null, null)).toDF("id2", "c2") +val dataset = d1.crossJoin(d2) + +def get_output(mode: String): DataFrame = { + val formula = new RFormula().setFormula("c1 ~ id2").setHandleInvalid(mode) + formula.fit(dataset).transform(dataset).select("features", "label") +} + +assert(intercept[SparkException](get_output("error").collect()) + .getMessage.contains("Encountered null while assembling a row")) +assert(get_output("skip").count() == 4) +assert(get_output("keep").count() == 6) + } + } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] spark issue #20970: [SPARK-23870][ML] Forward RFormula handleInvalid Param t...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20970 LGTM Merging with master Thanks @yogeshg ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15770#discussion_r178988503 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering} +import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +/** + * Common params for PowerIterationClustering + */ +private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter + with HasFeaturesCol with HasPredictionCol with HasWeightCol { + + /** + * The number of clusters to create (k). Must be > 1. Default: 2. + * @group param + */ + @Since("2.3.0") + final val k = new IntParam(this, "k", "The number of clusters to create. " + +"Must be > 1.", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("2.3.0") + def getK: Int = $(k) + + /** + * Param for the initialization algorithm. This can be either "random" to use a random vector + * as vertex properties, or "degree" to use normalized sum similarities. Default: random. + */ + @Since("2.3.0") + final val initMode = { +val allowedParams = ParamValidators.inArray(Array("random", "degree")) +new Param[String](this, "initMode", "The initialization algorithm. " + + "Supported options: 'random' and 'degree'.", allowedParams) + } + + /** @group expertGetParam */ + @Since("2.3.0") + def getInitMode: String = $(initMode) + + /** + * Param for the column name for ids returned by PowerIterationClustering.transform(). + * Default: "id" + * @group param + */ + @Since("2.3.0") + val idCol = new Param[String](this, "id", "column name for ids.") + + /** @group getParam */ + @Since("2.3.0") + def getIdCol: String = $(idCol) + + /** + * Param for the column name for neighbors required by PowerIterationClustering.transform(). + * Default: "neighbor" + * @group param + */ + @Since("2.3.0") + val neighborCol = new Param[String](this, "neighbor", "column name for neighbors.") + + /** @group getParam */ + @Since("2.3.0") + def getNeighborCol: String = $(neighborCol) + + /** + * Validates the input schema + * @param schema input schema + */ + protected def validateSchema(schema: StructType): Unit = { +SchemaUtils.checkColumnType(schema, $(idCol), LongType) +SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType) + } +} + +/** + * :: Experimental :: + * Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by + * http://www.icml2010.org/papers/387.pdf>Lin and Cohen. From the abstract: + * PIC finds a very low-dimensional embedding of a dataset using truncated power + * iteration on a normalized pair-wise similarity matrix of the data. + * + * Note that we implement [[PowerIterationClustering]] as a transformer.
[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15770#discussion_r178984276 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering} +import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +/** + * Common params for PowerIterationClustering + */ +private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter + with HasFeaturesCol with HasPredictionCol with HasWeightCol { + + /** + * The number of clusters to create (k). Must be > 1. Default: 2. + * @group param + */ + @Since("2.3.0") + final val k = new IntParam(this, "k", "The number of clusters to create. " + +"Must be > 1.", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("2.3.0") + def getK: Int = $(k) + + /** + * Param for the initialization algorithm. This can be either "random" to use a random vector + * as vertex properties, or "degree" to use normalized sum similarities. Default: random. + */ + @Since("2.3.0") + final val initMode = { +val allowedParams = ParamValidators.inArray(Array("random", "degree")) +new Param[String](this, "initMode", "The initialization algorithm. " + + "Supported options: 'random' and 'degree'.", allowedParams) + } + + /** @group expertGetParam */ + @Since("2.3.0") + def getInitMode: String = $(initMode) + + /** + * Param for the column name for ids returned by PowerIterationClustering.transform(). + * Default: "id" + * @group param + */ + @Since("2.3.0") + val idCol = new Param[String](this, "id", "column name for ids.") + + /** @group getParam */ + @Since("2.3.0") + def getIdCol: String = $(idCol) + + /** + * Param for the column name for neighbors required by PowerIterationClustering.transform(). + * Default: "neighbor" + * @group param + */ + @Since("2.3.0") + val neighborCol = new Param[String](this, "neighbor", "column name for neighbors.") + + /** @group getParam */ + @Since("2.3.0") + def getNeighborCol: String = $(neighborCol) + + /** + * Validates the input schema + * @param schema input schema + */ + protected def validateSchema(schema: StructType): Unit = { +SchemaUtils.checkColumnType(schema, $(idCol), LongType) +SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType) + } +} + +/** + * :: Experimental :: + * Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by + * http://www.icml2010.org/papers/387.pdf>Lin and Cohen. From the abstract: + * PIC finds a very low-dimensional embedding of a dataset using truncated power + * iteration on a normalized pair-wise similarity matrix of the data. + * + * Note that we implement [[PowerIterationClustering]] as a transformer.
[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15770#discussion_r178991306 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering} +import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +/** + * Common params for PowerIterationClustering + */ +private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter + with HasFeaturesCol with HasPredictionCol with HasWeightCol { + + /** + * The number of clusters to create (k). Must be > 1. Default: 2. + * @group param + */ + @Since("2.3.0") + final val k = new IntParam(this, "k", "The number of clusters to create. " + +"Must be > 1.", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("2.3.0") + def getK: Int = $(k) + + /** + * Param for the initialization algorithm. This can be either "random" to use a random vector + * as vertex properties, or "degree" to use normalized sum similarities. Default: random. + */ + @Since("2.3.0") + final val initMode = { +val allowedParams = ParamValidators.inArray(Array("random", "degree")) +new Param[String](this, "initMode", "The initialization algorithm. " + + "Supported options: 'random' and 'degree'.", allowedParams) + } + + /** @group expertGetParam */ + @Since("2.3.0") + def getInitMode: String = $(initMode) + + /** + * Param for the column name for ids returned by PowerIterationClustering.transform(). + * Default: "id" + * @group param + */ + @Since("2.3.0") + val idCol = new Param[String](this, "id", "column name for ids.") + + /** @group getParam */ + @Since("2.3.0") + def getIdCol: String = $(idCol) + + /** + * Param for the column name for neighbors required by PowerIterationClustering.transform(). + * Default: "neighbor" + * @group param + */ + @Since("2.3.0") + val neighborCol = new Param[String](this, "neighbor", "column name for neighbors.") + + /** @group getParam */ + @Since("2.3.0") + def getNeighborCol: String = $(neighborCol) + + /** + * Validates the input schema + * @param schema input schema + */ + protected def validateSchema(schema: StructType): Unit = { +SchemaUtils.checkColumnType(schema, $(idCol), LongType) +SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType) + } +} + +/** + * :: Experimental :: + * Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by + * http://www.icml2010.org/papers/387.pdf>Lin and Cohen. From the abstract: + * PIC finds a very low-dimensional embedding of a dataset using truncated power + * iteration on a normalized pair-wise similarity matrix of the data. + * + * Note that we implement [[PowerIterationClustering]] as a transformer.
[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15770#discussion_r178992899 --- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import scala.collection.mutable + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +class PowerIterationClusteringSuite extends SparkFunSuite + with MLlibTestSparkContext with DefaultReadWriteTest { + + @transient var data: Dataset[_] = _ + @transient var malData: Dataset[_] = _ + final val r1 = 1.0 + final val n1 = 10 + final val r2 = 4.0 + final val n2 = 40 + + override def beforeAll(): Unit = { +super.beforeAll() + +data = PowerIterationClusteringSuite.generatePICData(spark, r1, r2, n1, n2) + } + + test("default parameters") { +val pic = new PowerIterationClustering() + +assert(pic.getK === 2) +assert(pic.getMaxIter === 20) +assert(pic.getInitMode === "random") +assert(pic.getFeaturesCol === "features") +assert(pic.getPredictionCol === "prediction") +assert(pic.getIdCol === "id") +assert(pic.getWeightCol === "weight") +assert(pic.getNeighborCol === "neighbor") + } + + test("set parameters") { +val pic = new PowerIterationClustering() + .setK(9) + .setMaxIter(33) + .setInitMode("degree") + .setFeaturesCol("test_feature") + .setPredictionCol("test_prediction") + .setIdCol("test_id") + .setWeightCol("test_weight") + .setNeighborCol("test_neighbor") + +assert(pic.getK === 9) +assert(pic.getMaxIter === 33) +assert(pic.getInitMode === "degree") +assert(pic.getFeaturesCol === "test_feature") +assert(pic.getPredictionCol === "test_prediction") +assert(pic.getIdCol === "test_id") +assert(pic.getWeightCol === "test_weight") +assert(pic.getNeighborCol === "test_neighbor") + } + + test("parameters validation") { +intercept[IllegalArgumentException] { + new PowerIterationClustering().setK(1) +} +intercept[IllegalArgumentException] { + new PowerIterationClustering().setInitMode("no_such_a_mode") +} + } + + test("power iteration clustering") { +val n = n1 + n2 + +val model = new PowerIterationClustering() + .setK(2) + .setMaxIter(40) +val result = model.transform(data) + +val predictions = Array.fill(2)(mutable.Set.empty[Long]) +result.select("id", "prediction").collect().foreach { + case Row(id: Long, cluster: Integer) => predictions(cluster) += id +} +assert(predictions.toSet == Set((1 until n1).toSet, (n1 until n).toSet)) + +val result2 = new PowerIterationClustering() + .setK(2) + .setMaxIter(10) + .setInitMode("degree") + .transform(data) +val predictions2 = Array.fill(2)(mutable.Set.empty[Long]) +result2.select("id", "prediction").collect().foreach { + case Row(id: Long, cluster: Integer) => predictions2(cluster) += id +} +assert(predictions2.toSet == Set((1 until n1).toSet, (n1 until n).toSet)) + +val expectedColumns = Array("id", "prediction") --- End diff -- No need to check this since it's already checks above by result2.select(...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15770#discussion_r178988149 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering} +import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +/** + * Common params for PowerIterationClustering + */ +private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter + with HasFeaturesCol with HasPredictionCol with HasWeightCol { --- End diff -- Also, featuresCol is not used, so it should be removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15770#discussion_r178991834 --- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import scala.collection.mutable + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +class PowerIterationClusteringSuite extends SparkFunSuite + with MLlibTestSparkContext with DefaultReadWriteTest { + + @transient var data: Dataset[_] = _ + @transient var malData: Dataset[_] = _ --- End diff -- Not used --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15770#discussion_r178987751 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering} +import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +/** + * Common params for PowerIterationClustering + */ +private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter + with HasFeaturesCol with HasPredictionCol with HasWeightCol { + + /** + * The number of clusters to create (k). Must be > 1. Default: 2. + * @group param + */ + @Since("2.3.0") + final val k = new IntParam(this, "k", "The number of clusters to create. " + +"Must be > 1.", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("2.3.0") + def getK: Int = $(k) + + /** + * Param for the initialization algorithm. This can be either "random" to use a random vector + * as vertex properties, or "degree" to use normalized sum similarities. Default: random. + */ + @Since("2.3.0") + final val initMode = { +val allowedParams = ParamValidators.inArray(Array("random", "degree")) +new Param[String](this, "initMode", "The initialization algorithm. " + + "Supported options: 'random' and 'degree'.", allowedParams) + } + + /** @group expertGetParam */ + @Since("2.3.0") + def getInitMode: String = $(initMode) + + /** + * Param for the column name for ids returned by PowerIterationClustering.transform(). + * Default: "id" + * @group param + */ + @Since("2.3.0") + val idCol = new Param[String](this, "id", "column name for ids.") + + /** @group getParam */ + @Since("2.3.0") + def getIdCol: String = $(idCol) + + /** + * Param for the column name for neighbors required by PowerIterationClustering.transform(). + * Default: "neighbor" + * @group param + */ + @Since("2.3.0") + val neighborCol = new Param[String](this, "neighbor", "column name for neighbors.") + + /** @group getParam */ + @Since("2.3.0") + def getNeighborCol: String = $(neighborCol) + + /** + * Validates the input schema --- End diff -- nit: No need for doc like this which is explained by the method title --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15770#discussion_r178987675 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.linalg.{Vector} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering} +import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions.{col} +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +/** + * Common params for PowerIterationClustering + */ +private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter + with HasFeaturesCol with HasPredictionCol { + + /** + * The number of clusters to create (k). Must be > 1. Default: 2. + * @group param + */ + @Since("2.2.0") + final val k = new IntParam(this, "k", "The number of clusters to create. " + +"Must be > 1.", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("2.2.0") + def getK: Int = $(k) + + /** + * Param for the initialization algorithm. This can be either "random" to use a random vector + * as vertex properties, or "degree" to use normalized sum similarities. Default: random. + */ + @Since("2.2.0") + final val initMode = new Param[String](this, "initMode", "The initialization algorithm. " + +"Supported options: 'random' and 'degree'.", +(value: String) => validateInitMode(value)) + + private[spark] def validateInitMode(initMode: String): Boolean = { +initMode match { + case "random" => true + case "degree" => true + case _ => false +} + } + + /** @group expertGetParam */ + @Since("2.2.0") + def getInitMode: String = $(initMode) + + /** + * Param for the column name for ids returned by [[PowerIterationClustering.transform()]]. + * Default: "id" + * @group param + */ + val idCol = new Param[String](this, "idCol", "column name for ids.") + + /** @group getParam */ + def getIdCol: String = $(idCol) + + /** + * Validates the input schema + * @param schema input schema + */ + protected def validateSchema(schema: StructType): Unit = { --- End diff -- +1 Also: * This should check other input columns to make sure they are defined. * This should add predictionCol, not check that it exists in the input. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15770#discussion_r178987121 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering} +import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +/** + * Common params for PowerIterationClustering + */ +private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter + with HasFeaturesCol with HasPredictionCol with HasWeightCol { --- End diff -- We should not use weightCol, which is for instance weights, not for this kind of adjacency. Let's add a new Param here, perhaps called neighborWeightCol. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15770#discussion_r178983843 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering} +import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +/** + * Common params for PowerIterationClustering + */ +private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter + with HasFeaturesCol with HasPredictionCol with HasWeightCol { + + /** + * The number of clusters to create (k). Must be > 1. Default: 2. + * @group param + */ + @Since("2.3.0") + final val k = new IntParam(this, "k", "The number of clusters to create. " + +"Must be > 1.", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("2.3.0") + def getK: Int = $(k) + + /** + * Param for the initialization algorithm. This can be either "random" to use a random vector + * as vertex properties, or "degree" to use normalized sum similarities. Default: random. + */ + @Since("2.3.0") + final val initMode = { +val allowedParams = ParamValidators.inArray(Array("random", "degree")) +new Param[String](this, "initMode", "The initialization algorithm. " + + "Supported options: 'random' and 'degree'.", allowedParams) + } + + /** @group expertGetParam */ + @Since("2.3.0") + def getInitMode: String = $(initMode) + + /** + * Param for the column name for ids returned by PowerIterationClustering.transform(). + * Default: "id" + * @group param + */ + @Since("2.3.0") + val idCol = new Param[String](this, "id", "column name for ids.") + + /** @group getParam */ + @Since("2.3.0") + def getIdCol: String = $(idCol) + + /** + * Param for the column name for neighbors required by PowerIterationClustering.transform(). + * Default: "neighbor" + * @group param + */ + @Since("2.3.0") + val neighborCol = new Param[String](this, "neighbor", "column name for neighbors.") + + /** @group getParam */ + @Since("2.3.0") + def getNeighborCol: String = $(neighborCol) + + /** + * Validates the input schema + * @param schema input schema + */ + protected def validateSchema(schema: StructType): Unit = { +SchemaUtils.checkColumnType(schema, $(idCol), LongType) +SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType) + } +} + +/** + * :: Experimental :: + * Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by + * http://www.icml2010.org/papers/387.pdf>Lin and Cohen. From the abstract: + * PIC finds a very low-dimensional embedding of a dataset using truncated power + * iteration on a normalized pair-wise similarity matrix of the data. + * + * Note that we implement [[PowerIterationClustering]] as a transformer.
[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/15770 Just pinged @yanboliang on JIRA about me taking over shepherding this. It will need at least one update: change Since versions from 2.3.0 to 2.4.0. Sorry for the long wait @wangmiao1981 : ( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20633#discussion_r178956696 --- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala --- @@ -351,27 +359,90 @@ private[ml] object DefaultParamsReader { timestamp: Long, sparkVersion: String, params: JValue, + defaultParams: JValue, metadata: JValue, metadataJson: String) { + +private def getValueFromParams(params: JValue): Seq[(String, JValue)] = { + params match { +case JObject(pairs) => pairs +case _ => + throw new IllegalArgumentException( +s"Cannot recognize JSON metadata: $metadataJson.") + } +} + /** * Get the JSON value of the [[org.apache.spark.ml.param.Param]] of the given name. * This can be useful for getting a Param value before an instance of `Params` - * is available. + * is available. This will look up `params` first, if not existing then looking up + * `defaultParams`. */ def getParamValue(paramName: String): JValue = { implicit val format = DefaultFormats - params match { + + // Looking up for `params` first. + var pairs = getValueFromParams(params) + var foundPairs = pairs.filter { case (pName, jsonValue) => +pName == paramName + } + if (foundPairs.length == 0) { +// Looking up for `defaultParams` then. +pairs = getValueFromParams(defaultParams) +foundPairs = pairs.filter { case (pName, jsonValue) => + pName == paramName +} + } + assert(foundPairs.length == 1, s"Expected one instance of Param '$paramName' but found" + +s" ${foundPairs.length} in JSON Params: " + pairs.map(_.toString).mkString(", ")) + + foundPairs.map(_._2).head +} + +/** + * Extract Params from metadata, and set them in the instance. + * This works if all Params (except params included by `skipParams` list) implement + * [[org.apache.spark.ml.param.Param.jsonDecode()]]. + * + * @param skipParams The params included in `skipParams` won't be set. This is useful if some + * params don't implement [[org.apache.spark.ml.param.Param.jsonDecode()]] + * and need special handling. + */ +def getAndSetParams( +instance: Params, +skipParams: Option[List[String]] = None): Unit = { + setParams(instance, skipParams, isDefault = false) + + // For metadata file prior to Spark 2.4, there is no default section. + val (major, minor) = VersionUtils.majorMinorVersion(sparkVersion) + if (major >= 2 && minor >= 4) { --- End diff -- This should be: ```if (major > 2 || (major == 2 && minor >= 4)) {``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20633#discussion_r178955058 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala --- @@ -905,6 +905,15 @@ trait Params extends Identifiable with Serializable { } } +object Params { --- End diff -- Shall we make this object package private (for cleaner docs) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20633#discussion_r178955105 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala --- @@ -791,7 +791,7 @@ trait Params extends Identifiable with Serializable { * this method gets called. * @param value the default value */ - protected final def setDefault[T](param: Param[T], value: T): this.type = { + private[ml] final def setDefault[T](param: Param[T], value: T): this.type = { --- End diff -- Makes sense; I like your solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20970: [SPARK-23562][ML] Forward RFormula handleInvalid Param t...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20970 Side note: In general, when there is a JIRA with subtasks, it's nice to send all PRs against subtasks, rather than against the parent task. This can be good when we think of future subtasks which are needed, such as adding the python api for VectorAssembler handleInvalid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20970: [SPARK-23562][ML] Forward RFormula handleInvalid ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20970#discussion_r178938488 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala --- @@ -592,4 +593,26 @@ class RFormulaSuite extends MLTest with DefaultReadWriteTest { assert(features.toArray === a +: b.toArray) } } + + test("SPARK-23562 RFormula handleInvalid should handle invalid values in non-string columns.") { +val d1 = Seq( + (1001L, "a"), + (1002L, "b")).toDF("id1", "c1") +val d2 = Seq[(java.lang.Long, String)]( + (20001L, "x"), + (20002L, "y"), + (null, null)).toDF("id2", "c2") +val dataset = d1.crossJoin(d2) + +def get_output(mode: String): DataFrame = { + val formula = new RFormula().setFormula("c1 ~ id2").setHandleInvalid(mode) + formula.fit(dataset).transform(dataset).select("features", "label") +} + +intercept[SparkException](get_output("error").collect()) + .getMessage contains "Encountered null while assembling a row" --- End diff -- Also, did you mean to assert that contains returns true? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20970: [SPARK-23562][ML] Forward RFormula handleInvalid ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20970#discussion_r178938415 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala --- @@ -592,4 +593,26 @@ class RFormulaSuite extends MLTest with DefaultReadWriteTest { assert(features.toArray === a +: b.toArray) } } + + test("SPARK-23562 RFormula handleInvalid should handle invalid values in non-string columns.") { +val d1 = Seq( + (1001L, "a"), + (1002L, "b")).toDF("id1", "c1") +val d2 = Seq[(java.lang.Long, String)]( + (20001L, "x"), + (20002L, "y"), + (null, null)).toDF("id2", "c2") +val dataset = d1.crossJoin(d2) + +def get_output(mode: String): DataFrame = { + val formula = new RFormula().setFormula("c1 ~ id2").setHandleInvalid(mode) + formula.fit(dataset).transform(dataset).select("features", "label") +} + +intercept[SparkException](get_output("error").collect()) + .getMessage contains "Encountered null while assembling a row" --- End diff -- We try to avoid using infix notation. (We tend towards Java-like use of Scala for simplicity.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20837: [SPARK-23686][ML][WIP] Better instrumentation
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20837#discussion_r178905201 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala --- @@ -517,6 +517,9 @@ class LogisticRegression @Since("1.2.0") ( (new MultivariateOnlineSummarizer, new MultiClassSummarizer) )(seqOp, combOp, $(aggregationDepth)) } +instr.logNamedValue(Instrumentation.loggerTags.numExamples, summarizer.count) +instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) +instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.min.toString) --- End diff -- I'm OK with not logging the full histogram here. There's a typo, where "highestLabelWeight" is actually logging the min (not max) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20786: [SPARK-14681][ML] Provide label/impurity stats for spark...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20786 Thanks for the updates! I had a spacing typo in the fromOld() style fix I wrote above, and there are some traits which still need to be sealed. Hope you don't mind, but I sent a PR to be explicit here: https://github.com/WeichenXu123/spark/pull/5 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20964: [SPARK-22883] ML test for StructuredStreaming: sp...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20964#discussion_r178896682 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala --- @@ -167,4 +166,20 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa assert(precision == 1.0) assert(recall >= 0.7) } + + test("MinHashLSHModel.transform should work with Structured Streaming") { +val localSpark = spark +import localSpark.implicits._ + +val model = new MinHashLSHModel("mh", randCoefficients = Array((1, 0))) +model.set(model.inputCol, "keys") +testTransformer[Tuple1[Vector]](dataset.toDF(), model, "keys", model.getOutputCol) { + case Row(_: Vector, output: Seq[_]) => +assert(output.length === model.randCoefficients.length) +// no AND-amplification yet: SPARK-18450, so each hash output is of length 1 +output.foreach { + case hashOutput: Vector => assert(hashOutput.size === 1) +} +} --- End diff -- I don't think that's necessary for testing that this works with structured streaming. (I can't see how streaming would mess up the correctness of the algorithm.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20964: [SPARK-22883] ML test for StructuredStreaming: sp...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20964#discussion_r178896432 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala --- @@ -76,6 +75,28 @@ class ImputerSuite extends SparkFunSuite with MLlibTestSparkContext with Default ImputerSuite.iterateStrategyTest(imputer, df) } + test("Imputer should work with Structured Streaming") { +val localSpark = spark +import localSpark.implicits._ +val df = Seq[(java.lang.Double, Double)]( + (4.0, 4.0), + (10.0, 10.0), + (10.0, 10.0), + (Double.NaN, 8.0), + (null, 8.0) +).toDF("value", "expected_mean_value") --- End diff -- since it's nullable --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20964: [SPARK-22883] ML test for StructuredStreaming: sp...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20964#discussion_r178894229 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala --- @@ -48,8 +46,8 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De .setMax(5) val model = scaler.fit(df) -model.transform(df).select("expected", "scaled").collect() - .foreach { case Row(vector1: Vector, vector2: Vector) => +testTransformer[(Vector, Vector)](df, model, "expected", "scaled") { + case Row(vector1: Vector, vector2: Vector) => assert(vector1.equals(vector2), "Transformed vector is different with expected.") --- End diff -- True, === is more standard --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20964: [SPARK-22883] ML test for StructuredStreaming: sp...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20964#discussion_r178893936 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala --- @@ -84,7 +84,7 @@ class NGramSuite extends MLTest with DefaultReadWriteTest { def testNGram(t: NGram, dataFrame: DataFrame): Unit = { testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", "wantedNGrams") { - case Row(actualNGrams : Seq[String], wantedNGrams: Seq[String]) => + case Row(actualNGrams : Seq[_], wantedNGrams: Seq[_]) => --- End diff -- String is not actually checked because of erasure, so IntelliJ complained with a style warning before this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-23690][ML] Add handleinvalid to VectorAssembler
Repository: spark Updated Branches: refs/heads/master 28ea4e314 -> a1351828d [SPARK-23690][ML] Add handleinvalid to VectorAssembler ## What changes were proposed in this pull request? Introduce `handleInvalid` parameter in `VectorAssembler` that can take in `"keep", "skip", "error"` options. "error" throws an error on seeing a row containing a `null`, "skip" filters out all such rows, and "keep" adds relevant number of NaN. "keep" figures out an example to find out what this number of NaN s should be added and throws an error when no such number could be found. ## How was this patch tested? Unit tests are added to check the behavior of `assemble` on specific rows and the transformer is called on `DataFrame`s of different configurations to test different corner cases. Author: Yogesh Garg Author: Bago Amirbekian Author: Yogesh Garg <1059168+yoge...@users.noreply.github.com> Closes #20829 from yogeshg/rformula_handleinvalid. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1351828 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1351828 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1351828 Branch: refs/heads/master Commit: a1351828d376a01e5ee0959cf608f767d756dd86 Parents: 28ea4e3 Author: Yogesh Garg Authored: Mon Apr 2 16:41:26 2018 -0700 Committer: Joseph K. Bradley Committed: Mon Apr 2 16:41:26 2018 -0700 -- .../apache/spark/ml/feature/StringIndexer.scala | 2 +- .../spark/ml/feature/VectorAssembler.scala | 198 +++ .../spark/ml/feature/VectorAssemblerSuite.scala | 131 ++-- 3 files changed, 284 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a1351828/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index 1cdcdfc..67cdb09 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -234,7 +234,7 @@ class StringIndexerModel ( val metadata = NominalAttribute.defaultAttr .withName($(outputCol)).withValues(filteredLabels).toMetadata() // If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = getHandleInvalid match { +val (filteredDataset, keepInvalid) = $(handleInvalid) match { case StringIndexer.SKIP_INVALID => val filterer = udf { label: String => labelToIndex.contains(label) http://git-wip-us.apache.org/repos/asf/spark/blob/a1351828/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala index b373ae9..6bf4aa3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala @@ -17,14 +17,17 @@ package org.apache.spark.ml.feature -import scala.collection.mutable.ArrayBuilder +import java.util.NoSuchElementException + +import scala.collection.mutable +import scala.language.existentials import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute, UnresolvedAttribute} import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} -import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -33,10 +36,14 @@ import org.apache.spark.sql.types._ /** * A feature transformer that merges multiple columns into a vector column. + * + * This requires one pass over the entire dataset. In case we need to infer column lengths from the + * data we require an additional call to the 'first' Dataset method, see 'handleInvalid' parameter. */ @Since("1.4.0") class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) - extends Transformer with HasInputCols with HasOutputCol with DefaultParamsWritable { + extends Transformer with HasInputCols with HasOutputCol with HasHandleInvalid +with DefaultParamsWritable { @Since("1.4.0") def this() = this(Identifiable.randomUID("vecAssembler")) @@ -49,32 +56,63 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid:
[GitHub] spark issue #20829: [SPARK-23690][ML] Add handleinvalid to VectorAssembler
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20829 LGTM Merging with master Thanks @yogeshg for the PR and @WeichenXu123 for taking a look! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20964: [SPARK-22883] ML test for StructuredStreaming: sp...
GitHub user jkbradley opened a pull request: https://github.com/apache/spark/pull/20964 [SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M ## What changes were proposed in this pull request? Adds structured streaming tests using testTransformer for these suites: * IDF * Imputer * Interaction * MaxAbsScaler * MinHashLSH * MinMaxScaler * NGram ## How was this patch tested? It is a bunch of tests! You can merge this pull request into a Git repository by running: $ git pull https://github.com/jkbradley/spark SPARK-22883-part2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20964.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20964 commit c74cd89c220ea6a679a927c3d3caf1c4f3e24c0b Author: Joseph K. Bradley Date: 2018-03-30T17:44:31Z partly done commit 26f7519e31853b2abc8807e32eacd015164dd44d Author: Joseph K. Bradley Date: 2018-04-02T23:34:28Z remaining tests commit 778e822ad6ef8e029dc074981ae396feafed2962 Author: Joseph K. Bradley Date: 2018-04-02T23:39:10Z reverted MLTest change --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178676396 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- Oh, I see; you're right. Does this mean the latest change removes the space though? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178620282 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getVectorLengthsFromFirstRow(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), missing_columns) + case (true, VectorAssembler.KEEP_INVALID) => throw new RuntimeException( +s"""Can not infer column lengths for 'keep invalid' mode. Consider using --- End diff -- ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178620200 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { --- End diff -- ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178620104 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178620142 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178619977 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], --- End diff -- Ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178619893 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { --- End diff -- This still isn't correct; look at other MLlib code for examples. It should be: ``` private[feature] def getVectorLengthsFromFirstRow( dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r178202438 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,85 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( -override val prediction: Double, -override val impurity: Double, -override private[ml] val impurityStats: ImpurityCalculator) extends Node { +@Since("2.4.0") +trait ClassificationNode extends Node { + + /** + * Get count for specified label in this node + * @param label label number in the range [0, numClasses) + */ + @Since("2.4.0") + def getLabelCount(label: Int): Double = { +require(label >= 0 && label < impurityStats.stats.length, + "label should be in the rangle between 0 (inclusive) " + --- End diff -- rangle -> range --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r178202596 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,85 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( -override val prediction: Double, -override val impurity: Double, -override private[ml] val impurityStats: ImpurityCalculator) extends Node { +@Since("2.4.0") +trait ClassificationNode extends Node { + + /** + * Get count for specified label in this node + * @param label label number in the range [0, numClasses) + */ + @Since("2.4.0") + def getLabelCount(label: Int): Double = { +require(label >= 0 && label < impurityStats.stats.length, + "label should be in the rangle between 0 (inclusive) " + + s"and ${impurityStats.stats.length} (exclusive).") +impurityStats.stats(label) + } +} + +@Since("2.4.0") +trait RegressionNode extends Node { + + /** Number of data points in this node */ --- End diff -- "data points" -> "training data points" (for other methods too) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r178204351 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,85 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( -override val prediction: Double, -override val impurity: Double, -override private[ml] val impurityStats: ImpurityCalculator) extends Node { +@Since("2.4.0") +trait ClassificationNode extends Node { + + /** + * Get count for specified label in this node + * @param label label number in the range [0, numClasses) + */ + @Since("2.4.0") + def getLabelCount(label: Int): Double = { +require(label >= 0 && label < impurityStats.stats.length, + "label should be in the rangle between 0 (inclusive) " + + s"and ${impurityStats.stats.length} (exclusive).") +impurityStats.stats(label) + } +} + +@Since("2.4.0") +trait RegressionNode extends Node { + + /** Number of data points in this node */ + @Since("2.4.0") + def getCount: Double = impurityStats.stats(0) + + /** Sum of data points labels in this node */ + @Since("2.4.0") + def getSum: Double = impurityStats.stats(1) + + /** Sum of data points label squares in this node */ + @Since("2.4.0") + def getSumOfSquares: Double = impurityStats.stats(2) +} + +@Since("2.4.0") +trait LeafNode extends Node { + + /** Prediction this node makes. */ + def prediction: Double + + /** Impurity measure at this node (for training data) */ + def impurity: Double --- End diff -- This can be omitted since the docstring matches that in Node. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r178202260 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,73 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], --- End diff -- For multi-line class and method headers, put the first argument on the next line, and put each other argument on its own line. Just look at other code examples in MLlib. This should be: ``` def fromOld( oldNode: OldNode, categoricalFeatures: Map[Int, Int], isClassification: Boolean): Node = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r178202685 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,85 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( -override val prediction: Double, -override val impurity: Double, -override private[ml] val impurityStats: ImpurityCalculator) extends Node { +@Since("2.4.0") +trait ClassificationNode extends Node { + + /** + * Get count for specified label in this node + * @param label label number in the range [0, numClasses) + */ + @Since("2.4.0") + def getLabelCount(label: Int): Double = { +require(label >= 0 && label < impurityStats.stats.length, + "label should be in the rangle between 0 (inclusive) " + + s"and ${impurityStats.stats.length} (exclusive).") +impurityStats.stats(label) + } +} + +@Since("2.4.0") +trait RegressionNode extends Node { + + /** Number of data points in this node */ + @Since("2.4.0") + def getCount: Double = impurityStats.stats(0) + + /** Sum of data points labels in this node */ + @Since("2.4.0") + def getSum: Double = impurityStats.stats(1) + + /** Sum of data points label squares in this node */ --- End diff -- "Sum of data points label squares" -> "Sum over training data points of the square of the labels" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r178204775 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -17,9 +17,11 @@ package org.apache.spark.ml.tree +import org.apache.spark.annotation.Since import org.apache.spark.ml.linalg.Vector import org.apache.spark.mllib.tree.impurity.ImpurityCalculator -import org.apache.spark.mllib.tree.model.{ImpurityStats, InformationGainStats => OldInformationGainStats, Node => OldNode, Predict => OldPredict} +import org.apache.spark.mllib.tree.model.{ImpurityStats, InformationGainStats => OldInformationGainStats, + Node => OldNode, Predict => OldPredict} /** * Decision tree node interface. --- End diff -- We should make Node a trait, not an abstract class. I didn't realize that traits could inherit from abstract classes; weird... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r178202528 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,85 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( -override val prediction: Double, -override val impurity: Double, -override private[ml] val impurityStats: ImpurityCalculator) extends Node { +@Since("2.4.0") +trait ClassificationNode extends Node { + + /** + * Get count for specified label in this node --- End diff -- "count" -> "count of training examples" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177542325 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], --- End diff -- scala style: For multiline class and method headers, put the first argument on the next line, with +4 space indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177543971 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { --- End diff -- nit: In Scala, use camelCase: first_sizes -> firstSizes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177500915 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -17,26 +17,32 @@ package org.apache.spark.ml.feature +import java.util.NoSuchElementException + import scala.collection.mutable.ArrayBuilder +import scala.language.existentials import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute, UnresolvedAttribute} import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} -import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ /** - * A feature transformer that merges multiple columns into a vector column. + * A feature transformer that merges multiple columns into a vector column. This requires one pass --- End diff -- style nit: Move new text here into a new paragraph below. That will give nicer "pyramid-style" formatting with essential info separated from details. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177542280 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { --- End diff -- scala style: For multiline class and method headers, put the first argument on the next line, with +4 space indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177505970 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). Column lengths are taken from the size of ML Attribute Group, which can be set using + * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred + * from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. + * Default: "error" + * @group param + */ + @Since("2.4.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +""" +| Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with +| invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the +| output). Column lengths are taken from the size of ML Attribute Group, which can be set using +| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred +| from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. +| """.stripMargin.replaceAll("\n", " "), +ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val vectorCols = $(inputCols).toSeq.filter { c => + schema(c).dataType match { +case _: VectorUDT => true +case _ => false + } +} +val vectorColsLengths = VectorAssembler.getLengths(dataset, vectorCols, $(handleInvalid)) + +val featureAttributesMap = $(inputCols).toSeq.map { c => val field = schema(c) - val index = schema.fieldIndex(c) field.dataType match { case DoubleType => - val attr = Attribute.fromStructField(field) - // If the input column doesn't have ML attribute, assume numeric. - if (attr == UnresolvedAttribute) { -Some(NumericAttribute.defaultAttr.withName(c)) - } else { -Some(attr.withName(c)) + val attribute = Attribute.fromStructField(field) + attribute match { +case UnresolvedAttribute => + Seq(NumericAttribute.defaultAttr.withName(c)) +case _ => + Seq(attribute.withName(c)) } case _: NumericType | BooleanType => // If the input column type is a compatible scalar type, assume numeric. - Some(NumericAttribute.defaultAttr.withName(c)) + Seq(NumericAttribute.defaultAttr.withName(c)) case _: VectorUDT => - val group = AttributeGroup.fromStructField(field) - if (group.attributes.isDefined) { -// If attributes are defined, copy them with updated names. -group.attributes.get.zipWithIndex.map { case (attr, i) => + val attributeGroup = AttributeGroup.fromStructField(field) --- End diff -- for the future, I'd avoid renaming things like this unless it's really unclear or needed (to make diffs shorter) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177503206 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). Column lengths are taken from the size of ML Attribute Group, which can be set using + * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred + * from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. + * Default: "error" + * @group param + */ + @Since("2.4.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +""" +| Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with +| invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the +| output). Column lengths are taken from the size of ML Attribute Group, which can be set using +| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred +| from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. +| """.stripMargin.replaceAll("\n", " "), +ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val vectorCols = $(inputCols).toSeq.filter { c => --- End diff -- nit: Is toSeq extraneous? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177501836 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with --- End diff -- I'd recommend we deal with NaNs now. This PR is already dealing with some NaN cases: Dataset.na.drop handles NaNs in NumericType columns (but not VectorUDT columns). I'm Ok with postponing incorrect vector lengths until later or doing that now since that work will be more separate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177547373 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getVectorLengthsFromFirstRow(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), missing_columns) + case (true, VectorAssembler.KEEP_INVALID) => throw new RuntimeException( +s"""Can not infer column lengths for 'keep invalid' mode. Consider using --- End diff -- nit: Refer to specific Param value: "Cannot infer column lengths for mode handleInvalid = "keep"" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177543316 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177543289 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- nit: Put space in between this message and e.toString --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177559339 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +159,88 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls when keepInvalid is true") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors when keepInvalid is false") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Array(1, 1), false)(1.0, null)) +intercept[SparkException](assemble(Array(1, 2), false)(1.0, null)) +intercept[SparkException](assemble(Array(1), false)(null)) +intercept[SparkException](assemble(Array(2), false)(null)) + } + + test("get lengths functions") { +import org.apache.spark.ml.feature.VectorAssembler._ +val df = dfWithNulls +assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2)) + assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"), Seq("y"))) + .getMessage.contains("VectorSizeHint")) + assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1 > 4"), + Seq("y"))).getMessage.contains("VectorSizeHint")) + +assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == "y" -> 2)) +assert(intercept[NullPointerException](getLengths(df.sort("id2"), Seq("y"), ERROR_INVALID)) + .getMessage.contains("VectorSizeHint")) +assert(intercept[RuntimeException](getLengths(df.sort("id2"), Seq("y"), KEEP_INVALID)) + .getMessage.contains("VectorSizeHint")) + } + + test("Handle Invalid should behave properly") { +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +def run_with_metadata(mode: String, additional_filter: String = "true"): Dataset[_] = { --- End diff -- style: use camelCase --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177559587 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +159,88 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls when keepInvalid is true") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors when keepInvalid is false") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Array(1, 1), false)(1.0, null)) +intercept[SparkException](assemble(Array(1, 2), false)(1.0, null)) +intercept[SparkException](assemble(Array(1), false)(null)) +intercept[SparkException](assemble(Array(2), false)(null)) + } + + test("get lengths functions") { +import org.apache.spark.ml.feature.VectorAssembler._ +val df = dfWithNulls +assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2)) + assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"), Seq("y"))) + .getMessage.contains("VectorSizeHint")) + assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1 > 4"), + Seq("y"))).getMessage.contains("VectorSizeHint")) + +assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == "y" -> 2)) +assert(intercept[NullPointerException](getLengths(df.sort("id2"), Seq("y"), ERROR_INVALID)) + .getMessage.contains("VectorSizeHint")) +assert(intercept[RuntimeException](getLengths(df.sort("id2"), Seq("y"), KEEP_INVALID)) + .getMessage.contains("VectorSizeHint")) + } + + test("Handle Invalid should behave properly") { +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +def run_with_metadata(mode: String, additional_filter: String = "true"): Dataset[_] = { + val attributeY = new AttributeGroup("y", 2) + val subAttributesOfZ = Array(NumericAttribute.defaultAttr, NumericAttribute.defaultAttr) --- End diff -- unused --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177547735 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getVectorLengthsFromFirstRow(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), missing_columns) + case (true, VectorAssembler.KEEP_INVALID) => throw new RuntimeException( +s"""Can not infer column lengths for 'keep invalid' mode. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ")) + case (_, _) => Map.empty +} +group_sizes ++ first_sizes + } + + @Since("1.6.0") override def load(path: String): VectorAssembler = super.load(path) - private[feature] def assemble(vv: Any*): Vector = { + /** + * Returns a UDF that has the required information to assemble each row. --- End diff -- nit: When people say "UDF," they generally mean a Spark SQL UDF. This is just a function, not a SQL UDF. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177504904 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). Column lengths are taken from the size of ML Attribute Group, which can be set using + * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred + * from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. + * Default: "error" + * @group param + */ + @Since("2.4.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +""" +| Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with +| invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the +| output). Column lengths are taken from the size of ML Attribute Group, which can be set using +| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred +| from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. +| """.stripMargin.replaceAll("\n", " "), +ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val vectorCols = $(inputCols).toSeq.filter { c => + schema(c).dataType match { +case _: VectorUDT => true +case _ => false + } +} +val vectorColsLengths = VectorAssembler.getLengths(dataset, vectorCols, $(handleInvalid)) + +val featureAttributesMap = $(inputCols).toSeq.map { c => --- End diff -- I think the flatMap is simpler, or at least a more common pattern in Spark and Scala (rather than having nested sequences which are then flattened). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177558064 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -18,56 +18,68 @@ package org.apache.spark.ml.feature import org.apache.spark.{SparkException, SparkFunSuite} -import org.apache.spark.ml.attribute.{AttributeGroup, NominalAttribute, NumericAttribute} +import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute, NumericAttribute} import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.functions.{col, udf} class VectorAssemblerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { import testImplicits._ + @transient var dfWithNulls: Dataset[_] = _ + + override def beforeAll(): Unit = { +super.beforeAll() +dfWithNulls = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long, String)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L, null), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L, null), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L, null), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L, null)) + .toDF("id1", "id2", "x", "y", "name", "z", "n", "nulls") + } + test("params") { ParamsSuite.checkParams(new VectorAssembler) } test("assemble") { import org.apache.spark.ml.feature.VectorAssembler.assemble -assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) -assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) +assert(assemble(Array(1), true)(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) +assert(assemble(Array(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) val dv = Vectors.dense(2.0, 0.0) -assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) +assert(assemble(Array(1, 2, 1), true)(0.0, dv, 1.0) === + Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0)) -assert(assemble(0.0, dv, 1.0, sv) === +assert(assemble(Array(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) === Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0))) -for (v <- Seq(1, "a", null)) { - intercept[SparkException](assemble(v)) - intercept[SparkException](assemble(1.0, v)) +for (v <- Seq(1, "a")) { + intercept[SparkException](assemble(Array(1), true)(v)) + intercept[SparkException](assemble(Array(1, 1), true)(1.0, v)) } } test("assemble should compress vectors") { import org.apache.spark.ml.feature.VectorAssembler.assemble -val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0)) +val v1 = assemble(Array(1, 1, 1, 1), true)(0.0, 0.0, 0.0, Vectors.dense(4.0)) assert(v1.isInstanceOf[SparseVector]) -val v2 = assemble(1.0, 2.0, 3.0, Vectors.sparse(1, Array(0), Array(4.0))) +val sv = Vectors.sparse(1, Array(0), Array(4.0)) +val v2 = assemble(Array(1, 1, 1, 1), true)(1.0, 2.0, 3.0, sv) assert(v2.isInstanceOf[DenseVector]) } test("VectorAssembler") { -val df = Seq( - (0, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 10L) -).toDF("id", "x", "y", "name", "z", "n") +val df = dfWithNulls.filter("id1 == 1").withColumn("id", col("id1")) --- End diff -- nit: If this is for consolidation, I'm actually against this little change since it obscures what this test is doing and moves the input Row farther from the expected output row. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177543627 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], --- End diff -- style: state return value explicitly (This isn't completely consistent in Spark, but we try to in MLlib) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177560225 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +159,88 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls when keepInvalid is true") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors when keepInvalid is false") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Array(1, 1), false)(1.0, null)) +intercept[SparkException](assemble(Array(1, 2), false)(1.0, null)) +intercept[SparkException](assemble(Array(1), false)(null)) +intercept[SparkException](assemble(Array(2), false)(null)) + } + + test("get lengths functions") { +import org.apache.spark.ml.feature.VectorAssembler._ +val df = dfWithNulls +assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2)) + assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"), Seq("y"))) + .getMessage.contains("VectorSizeHint")) + assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1 > 4"), + Seq("y"))).getMessage.contains("VectorSizeHint")) + +assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == "y" -> 2)) +assert(intercept[NullPointerException](getLengths(df.sort("id2"), Seq("y"), ERROR_INVALID)) + .getMessage.contains("VectorSizeHint")) +assert(intercept[RuntimeException](getLengths(df.sort("id2"), Seq("y"), KEEP_INVALID)) + .getMessage.contains("VectorSizeHint")) + } + + test("Handle Invalid should behave properly") { +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +def run_with_metadata(mode: String, additional_filter: String = "true"): Dataset[_] = { + val attributeY = new AttributeGroup("y", 2) + val subAttributesOfZ = Array(NumericAttribute.defaultAttr, NumericAttribute.defaultAttr) + val attributeZ = new AttributeGroup( +"z", +Array[Attribute]( + NumericAttribute.defaultAttr.withName("foo"), + NumericAttribute.defaultAttr.withName("bar"))) + val dfWithMetadata = dfWithNulls.withColumn("y", col("y"), attributeY.toMetadata()) +.withColumn("z", col("z"), attributeZ.toMetadata()).filter(additional_filter) + val output = assembler.setHandleInvalid(mode).transform(dfWithMetadata) + output.collect() + output +} +def run_with_first_row(mode: String): Dataset[_] = { --- End diff -- style: Put empty line between functions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [MINOR] Fix Java lint from new JavaKolmogorovSmirnovTestSuite
Repository: spark Updated Branches: refs/heads/master 2b89e4aa2 -> a091ee676 [MINOR] Fix Java lint from new JavaKolmogorovSmirnovTestSuite ## What changes were proposed in this pull request? Fix lint-java from https://github.com/apache/spark/pull/19108 addition of JavaKolmogorovSmirnovTestSuite Author: Joseph K. Bradley Closes #20875 from jkbradley/kstest-lint-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a091ee67 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a091ee67 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a091ee67 Branch: refs/heads/master Commit: a091ee676b8707819e94d92693956237310a6145 Parents: 2b89e4a Author: Joseph K. Bradley Authored: Wed Mar 21 13:52:03 2018 -0700 Committer: Joseph K. Bradley Committed: Wed Mar 21 13:52:03 2018 -0700 -- .../apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java | 7 --- 1 file changed, 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a091ee67/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java -- diff --git a/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java index 021272d..830f668 100644 --- a/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/stat/JavaKolmogorovSmirnovTestSuite.java @@ -18,18 +18,11 @@ package org.apache.spark.ml.stat; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.commons.math3.distribution.NormalDistribution; -import org.apache.spark.ml.linalg.VectorUDT; -import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.types.DoubleType; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; import org.junit.Test; import org.apache.spark.SharedSparkSession; - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] spark issue #20875: [MINOR] Fix Java lint from new JavaKolmogorovSmirnovTest...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20875 Merging with master Thanks for checking @adrian-ionescu ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20875: [MINOR] Fix Java lint from new JavaKolmogorovSmirnovTest...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20875 CC @WeichenXu123 @MrBago @adrian-ionescu --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org