Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r157156926 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +147,258 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { + val spark = this.spark + import spark.implicits._ + + val datasetSize = 100000 + val numBuckets = 5 + val data1 = Array.range(1, 100001, 1).map(_.toDouble) + val data2 = Array.range(1, 200000, 2).map(_.toDouble) + val df = data1.zip(data2).toSeq.toDF("input1", "input2") + + val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) + val result = discretizer.fit(df).transform(df) + + val relativeError = discretizer.getRelativeError + val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) + } + + for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, + "Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, + "Bucket sizes are not within expected relative error tolerance.") + } + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { + val spark = this.spark + import spark.implicits._ + + val numBuckets = 5 + val expectedNumBucket = 3 + val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) + val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) + val df = data1.zip(data2).toSeq.toDF("input1", "input2") + val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) + val result = discretizer.fit(df).transform(df) + for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets == expectedNumBucket, + s"Observed number of buckets are not correct." + + s" Expected $expectedNumBucket but found ($observedNumBuckets") + } + } + + test("Multiple Columns: Test transform on data with NaN value") { + val spark = this.spark + import spark.implicits._ + + val numBuckets = 3 + val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) + val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0) + val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0) + val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, Double.NaN, Double.NaN) + val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 3.0) + val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0) + + val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) + + withClue("QuantileDiscretizer with handleInvalid=error should throw exception for NaN values") { + val dataFrame: DataFrame = validData1.zip(validData2).toSeq.toDF("input1", "input2") + intercept[SparkException] { + discretizer.fit(dataFrame).transform(dataFrame).collect() + } + } + + List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, expectedSkip2)).foreach { + case (u, v, w) => + discretizer.setHandleInvalid(u) + val dataFrame: DataFrame = validData1.zip(validData2).zip(v).zip(w).map { + case (((a, b), c), d) => (a, b, c, d) + }.toSeq.toDF("input1", "input2", "expected1", "expected2") + dataFrame.show + val result = discretizer.fit(dataFrame).transform(dataFrame) + result.show + result.select("result1", "expected1", "result2", "expected2").collect().foreach { + case Row(x: Double, y: Double, z: Double, w: Double) => + assert(x === y && w === z) + } + } + } + + test("Multiple Columns: Test numBucketsArray") { + val spark = this.spark + import spark.implicits._ + + val numBucketsArray: Array[Int] = Array(2, 5, 10) + val data1 = Array.range(1, 21, 1).map(_.toDouble) + val expected1 = Array (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0) + val data2 = Array.range(1, 40, 2).map(_.toDouble) + val expected2 = Array (0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 4.0, 4.0) + val data3 = Array.range(1, 60, 3).map(_.toDouble) + val expected3 = Array (0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 4.0, 4.0, 5.0, + 5.0, 5.0, 6.0, 6.0, 7.0, 8.0, 8.0, 9.0, 9.0, 9.0) + val data = (0 until 20).map { idx => + (data1(idx), data2(idx), data3(idx), expected1(idx), expected2(idx), expected3(idx)) + } + val df = + data.toDF("input1", "input2", "input3", "expected1", "expected2", "expected3") + + val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2", "input3")) + .setOutputCols(Array("result1", "result2", "result3")) + .setNumBucketsArray(numBucketsArray) + + discretizer.fit(df).transform(df). + select("result1", "expected1", "result2", "expected2", "result3", "expected3") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double, r3: Double, e3: Double) => + assert(r1 === e1, + s"The result value is not correct after bucketing. Expected $e1 but found $r1") + assert(r2 === e2, + s"The result value is not correct after bucketing. Expected $e2 but found $r2") + assert(r3 === e3, + s"The result value is not correct after bucketing. Expected $e3 but found $r3") + } + } + + test("Multiple Columns: Compare single/multiple column(s) QuantileDiscretizer in pipeline") { + val spark = this.spark + import spark.implicits._ + + val numBucketsArray: Array[Int] = Array(2, 5, 10) + val data1 = Array.range(1, 21, 1).map(_.toDouble) + val data2 = Array.range(1, 40, 2).map(_.toDouble) + val data3 = Array.range(1, 60, 3).map(_.toDouble) + val data = (0 until 20).map { idx => + (data1(idx), data2(idx), data3(idx)) + } + val df = + data.toDF("input1", "input2", "input3") + + val multiColsDiscretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2", "input3")) + .setOutputCols(Array("result1", "result2", "result3")) + .setNumBucketsArray(numBucketsArray) + val plForMultiCols = new Pipeline() + .setStages(Array(multiColsDiscretizer)) + .fit(df) + + val discretizerForCol1 = new QuantileDiscretizer() + .setInputCol("input1") + .setOutputCol("result1") + .setNumBuckets(numBucketsArray(0)) + + val discretizerForCol2 = new QuantileDiscretizer() + .setInputCol("input2") + .setOutputCol("result2") + .setNumBuckets(numBucketsArray(1)) + + val discretizerForCol3 = new QuantileDiscretizer() + .setInputCol("input3") + .setOutputCol("result3") + .setNumBuckets(numBucketsArray(2)) + + val plForSingleCol = new Pipeline() + .setStages(Array(discretizerForCol1, discretizerForCol2, discretizerForCol3)) + .fit(df) + + val resultForMultiCols = plForMultiCols.transform(df) + .select("result1", "result2", "result3") + .collect() + + val resultForSingleCol = plForSingleCol.transform(df) + .select("result1", "result2", "result3") + .collect() + + resultForSingleCol.zip(resultForMultiCols).foreach { + case (rowForSingle, rowForMultiCols) => + assert(rowForSingle.getDouble(0) == rowForMultiCols.getDouble(0) && + rowForSingle.getDouble(1) == rowForMultiCols.getDouble(1) && + rowForSingle.getDouble(2) == rowForMultiCols.getDouble(2)) + } + } + + test("Multiple Columns: Comparing setting numBuckets with setting numBucketsArray " + + "explicitly with identical values") { + val spark = this.spark + import spark.implicits._ + + val datasetSize = 20 + val numBucketsArray: Array[Int] = Array(2, 5, 10) + val data1 = Array.range(1, 21, 1).map(_.toDouble) + val data2 = Array.range(1, 40, 2).map(_.toDouble) + val data3 = Array.range(1, 60, 3).map(_.toDouble) + val data = (0 until 20).map { idx => --- End diff -- can use `datasetSize` here? Or remove `datasetSize` as it's unused.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org