Repository: spark
Updated Branches:
  refs/heads/master 439c69511 -> d4a0895c6


[SPARK-22884][ML] ML tests for StructuredStreaming: spark.ml.clustering

## What changes were proposed in this pull request?

Converting clustering tests to also check code with structured streaming, using 
the ML testing infrastructure implemented in SPARK-22882.

This PR is a new version of https://github.com/apache/spark/pull/20319

Author: Sandor Murakozi <smurak...@gmail.com>
Author: Joseph K. Bradley <jos...@databricks.com>

Closes #21358 from jkbradley/smurakozi-SPARK-22884.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4a0895c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4a0895c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4a0895c

Branch: refs/heads/master
Commit: d4a0895c628ca854895c3c35c46ed990af36ec61
Parents: 439c695
Author: Sandor Murakozi <smurak...@gmail.com>
Authored: Thu May 17 16:33:06 2018 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Thu May 17 16:33:06 2018 -0700

----------------------------------------------------------------------
 .../ml/clustering/BisectingKMeansSuite.scala    | 41 ++++++++++----------
 .../ml/clustering/GaussianMixtureSuite.scala    | 22 ++++-------
 .../spark/ml/clustering/KMeansSuite.scala       | 31 +++++++--------
 .../apache/spark/ml/clustering/LDASuite.scala   | 21 ++++------
 4 files changed, 50 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d4a0895c/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
index f3ff2af..81842af 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
@@ -19,17 +19,18 @@ package org.apache.spark.ml.clustering
 
 import scala.language.existentials
 
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.SparkException
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.ParamMap
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.clustering.DistanceMeasure
-import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.Dataset
 
-class BisectingKMeansSuite
-  extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+
+class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
+
+  import testImplicits._
 
   final val k = 5
   @transient var dataset: Dataset[_] = _
@@ -68,10 +69,13 @@ class BisectingKMeansSuite
 
     // Verify fit does not fail on very sparse data
     val model = bkm.fit(sparseDataset)
-    val result = model.transform(sparseDataset)
-    val numClusters = result.select("prediction").distinct().collect().length
-    // Verify we hit the edge case
-    assert(numClusters < k && numClusters > 1)
+
+    testTransformerByGlobalCheckFunc[Tuple1[Vector]](sparseDataset.toDF(), 
model, "prediction") {
+      rows =>
+        val numClusters = rows.distinct.length
+        // Verify we hit the edge case
+        assert(numClusters < k && numClusters > 1)
+    }
   }
 
   test("setter/getter") {
@@ -104,19 +108,16 @@ class BisectingKMeansSuite
     val bkm = new 
BisectingKMeans().setK(k).setPredictionCol(predictionColName).setSeed(1)
     val model = bkm.fit(dataset)
     assert(model.clusterCenters.length === k)
-
-    val transformed = model.transform(dataset)
-    val expectedColumns = Array("features", predictionColName)
-    expectedColumns.foreach { column =>
-      assert(transformed.columns.contains(column))
-    }
-    val clusters =
-      
transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet
-    assert(clusters.size === k)
-    assert(clusters === Set(0, 1, 2, 3, 4))
     assert(model.computeCost(dataset) < 0.1)
     assert(model.hasParent)
 
+    testTransformerByGlobalCheckFunc[Tuple1[Vector]](dataset.toDF(), model,
+      "features", predictionColName) { rows =>
+      val clusters = rows.map(_.getAs[Int](predictionColName)).toSet
+      assert(clusters.size === k)
+      assert(clusters === Set(0, 1, 2, 3, 4))
+    }
+
     // Check validity of model summary
     val numRows = dataset.count()
     assert(model.hasSummary)

http://git-wip-us.apache.org/repos/asf/spark/blob/d4a0895c/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
index d0d461a..0b91f50 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
@@ -23,16 +23,15 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Vector, Vectors}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.stat.distribution.MultivariateGaussian
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.{Dataset, Row}
 
-class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
-  with DefaultReadWriteTest {
 
-  import testImplicits._
+class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
+
   import GaussianMixtureSuite._
+  import testImplicits._
 
   final val k = 5
   private val seed = 538009335
@@ -119,15 +118,10 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
     assert(model.weights.length === k)
     assert(model.gaussians.length === k)
 
-    val transformed = model.transform(dataset)
-    val expectedColumns = Array("features", predictionColName, 
probabilityColName)
-    expectedColumns.foreach { column =>
-      assert(transformed.columns.contains(column))
-    }
-
     // Check prediction matches the highest probability, and probabilities sum 
to one.
-    transformed.select(predictionColName, 
probabilityColName).collect().foreach {
-      case Row(pred: Int, prob: Vector) =>
+    testTransformer[Tuple1[Vector]](dataset.toDF(), model,
+      "features", predictionColName, probabilityColName) {
+      case Row(_, pred: Int, prob: Vector) =>
         val probArray = prob.toArray
         val predFromProb = probArray.zipWithIndex.maxBy(_._1)._2
         assert(pred === predFromProb)

http://git-wip-us.apache.org/repos/asf/spark/blob/d4a0895c/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
index 680a7c2..2569e7a 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
@@ -22,20 +22,21 @@ import scala.util.Random
 
 import org.dmg.pmml.{ClusteringModel, PMML}
 
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.SparkException
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.ParamMap
-import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils, 
PMMLReadWriteTest}
 import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => 
MLlibKMeans, KMeansModel => MLlibKMeansModel}
+import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => 
MLlibKMeans,
+  KMeansModel => MLlibKMeansModel}
 import org.apache.spark.mllib.linalg.{Vectors => MLlibVectors}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 
 private[clustering] case class TestRow(features: Vector)
 
-class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest
-  with PMMLReadWriteTest {
+class KMeansSuite extends MLTest with DefaultReadWriteTest with 
PMMLReadWriteTest {
+
+  import testImplicits._
 
   final val k = 5
   @transient var dataset: Dataset[_] = _
@@ -109,15 +110,13 @@ class KMeansSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultR
     val model = kmeans.fit(dataset)
     assert(model.clusterCenters.length === k)
 
-    val transformed = model.transform(dataset)
-    val expectedColumns = Array("features", predictionColName)
-    expectedColumns.foreach { column =>
-      assert(transformed.columns.contains(column))
+    testTransformerByGlobalCheckFunc[Tuple1[Vector]](dataset.toDF(), model,
+      "features", predictionColName) { rows =>
+      val clusters = rows.map(_.getAs[Int](predictionColName)).toSet
+      assert(clusters.size === k)
+      assert(clusters === Set(0, 1, 2, 3, 4))
     }
-    val clusters =
-      
transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet
-    assert(clusters.size === k)
-    assert(clusters === Set(0, 1, 2, 3, 4))
+
     assert(model.computeCost(dataset) < 0.1)
     assert(model.hasParent)
 
@@ -149,9 +148,7 @@ class KMeansSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultR
     model.setFeaturesCol(featuresColName).setPredictionCol(predictionColName)
 
     val transformed = model.transform(dataset.withColumnRenamed("features", 
featuresColName))
-    Seq(featuresColName, predictionColName).foreach { column =>
-      assert(transformed.columns.contains(column))
-    }
+    assert(transformed.schema.fieldNames.toSet === Set(featuresColName, 
predictionColName))
     assert(model.getFeaturesCol == featuresColName)
     assert(model.getPredictionCol == predictionColName)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d4a0895c/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
index 4d84820..096b541 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
@@ -21,11 +21,9 @@ import scala.language.existentials
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{Vector, Vectors}
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql._
 
 object LDASuite {
@@ -61,7 +59,7 @@ object LDASuite {
 }
 
 
-class LDASuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+class LDASuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
@@ -186,16 +184,11 @@ class LDASuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
     assert(model.topicsMatrix.numCols === k)
     assert(!model.isDistributed)
 
-    // transform()
-    val transformed = model.transform(dataset)
-    val expectedColumns = Array("features", lda.getTopicDistributionCol)
-    expectedColumns.foreach { column =>
-      assert(transformed.columns.contains(column))
-    }
-    transformed.select(lda.getTopicDistributionCol).collect().foreach { r =>
-      val topicDistribution = r.getAs[Vector](0)
-      assert(topicDistribution.size === k)
-      assert(topicDistribution.toArray.forall(w => w >= 0.0 && w <= 1.0))
+    testTransformer[Tuple1[Vector]](dataset.toDF(), model,
+      "features", lda.getTopicDistributionCol) {
+      case Row(_, topicDistribution: Vector) =>
+        assert(topicDistribution.size === k)
+        assert(topicDistribution.toArray.forall(w => w >= 0.0 && w <= 1.0))
     }
 
     // logLikelihood, logPerplexity


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to