Github user MLnick commented on a diff in the pull request:
https://github.com/apache/spark/pull/19715#discussion_r157156926
--- Diff:
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
---
@@ -146,4 +147,258 @@ class QuantileDiscretizerSuite
val model = discretizer.fit(df)
assert(model.hasParent)
}
+
+ test("Multiple Columns: Test observed number of buckets and their sizes
match expected values") {
+ val spark = this.spark
+ import spark.implicits._
+
+ val datasetSize = 100000
+ val numBuckets = 5
+ val data1 = Array.range(1, 100001, 1).map(_.toDouble)
+ val data2 = Array.range(1, 200000, 2).map(_.toDouble)
+ val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+
+ val discretizer = new QuantileDiscretizer()
+ .setInputCols(Array("input1", "input2"))
+ .setOutputCols(Array("result1", "result2"))
+ .setNumBuckets(numBuckets)
+ val result = discretizer.fit(df).transform(df)
+
+ val relativeError = discretizer.getRelativeError
+ val isGoodBucket = udf {
+ (size: Int) => math.abs( size - (datasetSize / numBuckets)) <=
(relativeError * datasetSize)
+ }
+
+ for (i <- 1 to 2) {
+ val observedNumBuckets = result.select("result" + i).distinct.count
+ assert(observedNumBuckets === numBuckets,
+ "Observed number of buckets does not equal expected number of
buckets.")
+
+ val numGoodBuckets = result.groupBy("result" +
i).count.filter(isGoodBucket($"count")).count
+ assert(numGoodBuckets === numBuckets,
+ "Bucket sizes are not within expected relative error tolerance.")
+ }
+ }
+
+ test("Multiple Columns: Test on data with high proportion of duplicated
values") {
+ val spark = this.spark
+ import spark.implicits._
+
+ val numBuckets = 5
+ val expectedNumBucket = 3
+ val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0,
1.0, 3.0)
+ val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0,
1.0, 2.0)
+ val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+ val discretizer = new QuantileDiscretizer()
+ .setInputCols(Array("input1", "input2"))
+ .setOutputCols(Array("result1", "result2"))
+ .setNumBuckets(numBuckets)
+ val result = discretizer.fit(df).transform(df)
+ for (i <- 1 to 2) {
+ val observedNumBuckets = result.select("result" + i).distinct.count
+ assert(observedNumBuckets == expectedNumBucket,
+ s"Observed number of buckets are not correct." +
+ s" Expected $expectedNumBucket but found ($observedNumBuckets")
+ }
+ }
+
+ test("Multiple Columns: Test transform on data with NaN value") {
+ val spark = this.spark
+ import spark.implicits._
+
+ val numBuckets = 3
+ val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9,
Double.NaN, Double.NaN, Double.NaN)
+ val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0,
3.0)
+ val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0)
+ val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN,
Double.NaN, Double.NaN)
+ val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0,
3.0)
+ val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0)
+
+ val discretizer = new QuantileDiscretizer()
+ .setInputCols(Array("input1", "input2"))
+ .setOutputCols(Array("result1", "result2"))
+ .setNumBuckets(numBuckets)
+
+ withClue("QuantileDiscretizer with handleInvalid=error should throw
exception for NaN values") {
+ val dataFrame: DataFrame =
validData1.zip(validData2).toSeq.toDF("input1", "input2")
+ intercept[SparkException] {
+ discretizer.fit(dataFrame).transform(dataFrame).collect()
+ }
+ }
+
+ List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1,
expectedSkip2)).foreach {
+ case (u, v, w) =>
+ discretizer.setHandleInvalid(u)
+ val dataFrame: DataFrame =
validData1.zip(validData2).zip(v).zip(w).map {
+ case (((a, b), c), d) => (a, b, c, d)
+ }.toSeq.toDF("input1", "input2", "expected1", "expected2")
+ dataFrame.show
+ val result = discretizer.fit(dataFrame).transform(dataFrame)
+ result.show
+ result.select("result1", "expected1", "result2",
"expected2").collect().foreach {
+ case Row(x: Double, y: Double, z: Double, w: Double) =>
+ assert(x === y && w === z)
+ }
+ }
+ }
+
+ test("Multiple Columns: Test numBucketsArray") {
+ val spark = this.spark
+ import spark.implicits._
+
+ val numBucketsArray: Array[Int] = Array(2, 5, 10)
+ val data1 = Array.range(1, 21, 1).map(_.toDouble)
+ val expected1 = Array (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
+ val data2 = Array.range(1, 40, 2).map(_.toDouble)
+ val expected2 = Array (0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0,
2.0,
+ 2.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 4.0, 4.0)
+ val data3 = Array.range(1, 60, 3).map(_.toDouble)
+ val expected3 = Array (0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 4.0, 4.0,
5.0,
+ 5.0, 5.0, 6.0, 6.0, 7.0, 8.0, 8.0, 9.0, 9.0, 9.0)
+ val data = (0 until 20).map { idx =>
+ (data1(idx), data2(idx), data3(idx), expected1(idx), expected2(idx),
expected3(idx))
+ }
+ val df =
+ data.toDF("input1", "input2", "input3", "expected1", "expected2",
"expected3")
+
+ val discretizer = new QuantileDiscretizer()
+ .setInputCols(Array("input1", "input2", "input3"))
+ .setOutputCols(Array("result1", "result2", "result3"))
+ .setNumBucketsArray(numBucketsArray)
+
+ discretizer.fit(df).transform(df).
+ select("result1", "expected1", "result2", "expected2", "result3",
"expected3")
+ .collect().foreach {
+ case Row(r1: Double, e1: Double, r2: Double, e2: Double, r3: Double,
e3: Double) =>
+ assert(r1 === e1,
+ s"The result value is not correct after bucketing. Expected $e1
but found $r1")
+ assert(r2 === e2,
+ s"The result value is not correct after bucketing. Expected $e2
but found $r2")
+ assert(r3 === e3,
+ s"The result value is not correct after bucketing. Expected $e3
but found $r3")
+ }
+ }
+
+ test("Multiple Columns: Compare single/multiple column(s)
QuantileDiscretizer in pipeline") {
+ val spark = this.spark
+ import spark.implicits._
+
+ val numBucketsArray: Array[Int] = Array(2, 5, 10)
+ val data1 = Array.range(1, 21, 1).map(_.toDouble)
+ val data2 = Array.range(1, 40, 2).map(_.toDouble)
+ val data3 = Array.range(1, 60, 3).map(_.toDouble)
+ val data = (0 until 20).map { idx =>
+ (data1(idx), data2(idx), data3(idx))
+ }
+ val df =
+ data.toDF("input1", "input2", "input3")
+
+ val multiColsDiscretizer = new QuantileDiscretizer()
+ .setInputCols(Array("input1", "input2", "input3"))
+ .setOutputCols(Array("result1", "result2", "result3"))
+ .setNumBucketsArray(numBucketsArray)
+ val plForMultiCols = new Pipeline()
+ .setStages(Array(multiColsDiscretizer))
+ .fit(df)
+
+ val discretizerForCol1 = new QuantileDiscretizer()
+ .setInputCol("input1")
+ .setOutputCol("result1")
+ .setNumBuckets(numBucketsArray(0))
+
+ val discretizerForCol2 = new QuantileDiscretizer()
+ .setInputCol("input2")
+ .setOutputCol("result2")
+ .setNumBuckets(numBucketsArray(1))
+
+ val discretizerForCol3 = new QuantileDiscretizer()
+ .setInputCol("input3")
+ .setOutputCol("result3")
+ .setNumBuckets(numBucketsArray(2))
+
+ val plForSingleCol = new Pipeline()
+ .setStages(Array(discretizerForCol1, discretizerForCol2,
discretizerForCol3))
+ .fit(df)
+
+ val resultForMultiCols = plForMultiCols.transform(df)
+ .select("result1", "result2", "result3")
+ .collect()
+
+ val resultForSingleCol = plForSingleCol.transform(df)
+ .select("result1", "result2", "result3")
+ .collect()
+
+ resultForSingleCol.zip(resultForMultiCols).foreach {
+ case (rowForSingle, rowForMultiCols) =>
+ assert(rowForSingle.getDouble(0) == rowForMultiCols.getDouble(0) &&
+ rowForSingle.getDouble(1) == rowForMultiCols.getDouble(1) &&
+ rowForSingle.getDouble(2) == rowForMultiCols.getDouble(2))
+ }
+ }
+
+ test("Multiple Columns: Comparing setting numBuckets with setting
numBucketsArray " +
+ "explicitly with identical values") {
+ val spark = this.spark
+ import spark.implicits._
+
+ val datasetSize = 20
+ val numBucketsArray: Array[Int] = Array(2, 5, 10)
+ val data1 = Array.range(1, 21, 1).map(_.toDouble)
+ val data2 = Array.range(1, 40, 2).map(_.toDouble)
+ val data3 = Array.range(1, 60, 3).map(_.toDouble)
+ val data = (0 until 20).map { idx =>
--- End diff --
can use `datasetSize` here? Or remove `datasetSize` as it's unused.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]