[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19715 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r158347717 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -386,19 +382,16 @@ class QuantileDiscretizerSuite testDefaultReadWrite(discretizer) } - test("Both inputCol and inputCols are set") { -val spark = this.spark -import spark.implicits._ -val discretizer = new QuantileDiscretizer() - .setInputCol("input") - .setOutputCol("result") - .setNumBuckets(3) - .setInputCols(Array("input1", "input2")) -val df = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)) - .map(Tuple1.apply).toDF("input") -// When both inputCol and inputCols are set, we throw Exception. -intercept[Exception] { - discretizer.fit(df) + test("multiple columns: Both inputCol and inputCols are set") { +intercept[IllegalArgumentException] { + new QuantileDiscretizer().setInputCol("in").setInputCols(Array("in1", "in2")).getInOutCols --- End diff -- Thanks for spending so much time to review this PR. I will change this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r158239692 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -386,19 +382,16 @@ class QuantileDiscretizerSuite testDefaultReadWrite(discretizer) } - test("Both inputCol and inputCols are set") { -val spark = this.spark -import spark.implicits._ -val discretizer = new QuantileDiscretizer() - .setInputCol("input") - .setOutputCol("result") - .setNumBuckets(3) - .setInputCols(Array("input1", "input2")) -val df = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)) - .map(Tuple1.apply).toDF("input") -// When both inputCol and inputCols are set, we throw Exception. -intercept[Exception] { - discretizer.fit(df) + test("multiple columns: Both inputCol and inputCols are set") { +intercept[IllegalArgumentException] { + new QuantileDiscretizer().setInputCol("in").setInputCols(Array("in1", "in2")).getInOutCols --- End diff -- I think I slightly prefer to actually test that the error is thrown during `transform` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r157282154 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +147,258 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets == expectedNumBucket, +s"Observed number of buckets are not correct." + + s" Expected $expectedNumBucket but found ($observedNumBuckets") +} + } + + test("Multiple Columns: Test transform on data with NaN value") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 3 +val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0) +val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0) + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) + +withClue("QuantileDiscretizer with handleInvalid=error should throw exception for NaN values") { + val dataFrame: DataFrame = validData1.zip(validData2).toSeq.toDF("input1", "input2") + intercept[SparkException] { +discretizer.fit(dataFrame).transform(dataFrame).collect() + } +} + +List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, expectedSkip2)).foreach { + case (u, v, w) => +discretizer.setHandleInvalid(u) +val dataFrame: DataFrame = validData1.zip(validData2).zip(v).zip(w).map { + case (((a, b), c), d) => (a, b, c, d) +}.toSeq.toDF("input1", "input2", "expected1", "expected2") +dataFrame.show +val result = discretizer.fit(dataFrame).transform(dataFrame) +result.show +result.select("result1", "expected1", "result2", "expected2").collect().foreach { + case Row(x: Double, y: Double, z: Double, w: Double) => +assert(x === y && w === z) +} +} + } + + test("Multiple Columns: Test numBucketsArray") { +val spark = this.spark +import
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r157156875 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +147,258 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets == expectedNumBucket, +s"Observed number of buckets are not correct." + + s" Expected $expectedNumBucket but found ($observedNumBuckets") +} + } + + test("Multiple Columns: Test transform on data with NaN value") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 3 +val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0) +val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0) + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) + +withClue("QuantileDiscretizer with handleInvalid=error should throw exception for NaN values") { + val dataFrame: DataFrame = validData1.zip(validData2).toSeq.toDF("input1", "input2") + intercept[SparkException] { +discretizer.fit(dataFrame).transform(dataFrame).collect() + } +} + +List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, expectedSkip2)).foreach { + case (u, v, w) => +discretizer.setHandleInvalid(u) +val dataFrame: DataFrame = validData1.zip(validData2).zip(v).zip(w).map { + case (((a, b), c), d) => (a, b, c, d) +}.toSeq.toDF("input1", "input2", "expected1", "expected2") +dataFrame.show +val result = discretizer.fit(dataFrame).transform(dataFrame) +result.show +result.select("result1", "expected1", "result2", "expected2").collect().foreach { + case Row(x: Double, y: Double, z: Double, w: Double) => +assert(x === y && w === z) +} +} + } + + test("Multiple Columns: Test numBucketsArray") { +val spark = this.spark +import
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r157156926 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +147,258 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets == expectedNumBucket, +s"Observed number of buckets are not correct." + + s" Expected $expectedNumBucket but found ($observedNumBuckets") +} + } + + test("Multiple Columns: Test transform on data with NaN value") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 3 +val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0) +val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0) + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) + +withClue("QuantileDiscretizer with handleInvalid=error should throw exception for NaN values") { + val dataFrame: DataFrame = validData1.zip(validData2).toSeq.toDF("input1", "input2") + intercept[SparkException] { +discretizer.fit(dataFrame).transform(dataFrame).collect() + } +} + +List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, expectedSkip2)).foreach { + case (u, v, w) => +discretizer.setHandleInvalid(u) +val dataFrame: DataFrame = validData1.zip(validData2).zip(v).zip(w).map { + case (((a, b), c), d) => (a, b, c, d) +}.toSeq.toDF("input1", "input2", "expected1", "expected2") +dataFrame.show +val result = discretizer.fit(dataFrame).transform(dataFrame) +result.show +result.select("result1", "expected1", "result2", "expected2").collect().foreach { + case Row(x: Double, y: Double, z: Double, w: Double) => +assert(x === y && w === z) +} +} + } + + test("Multiple Columns: Test numBucketsArray") { +val spark = this.spark +import
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r157158204 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +147,258 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets == expectedNumBucket, +s"Observed number of buckets are not correct." + + s" Expected $expectedNumBucket but found ($observedNumBuckets") +} + } + + test("Multiple Columns: Test transform on data with NaN value") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 3 +val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0) +val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0) + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) + +withClue("QuantileDiscretizer with handleInvalid=error should throw exception for NaN values") { + val dataFrame: DataFrame = validData1.zip(validData2).toSeq.toDF("input1", "input2") + intercept[SparkException] { +discretizer.fit(dataFrame).transform(dataFrame).collect() + } +} + +List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, expectedSkip2)).foreach { + case (u, v, w) => +discretizer.setHandleInvalid(u) +val dataFrame: DataFrame = validData1.zip(validData2).zip(v).zip(w).map { + case (((a, b), c), d) => (a, b, c, d) +}.toSeq.toDF("input1", "input2", "expected1", "expected2") +dataFrame.show +val result = discretizer.fit(dataFrame).transform(dataFrame) +result.show +result.select("result1", "expected1", "result2", "expected2").collect().foreach { + case Row(x: Double, y: Double, z: Double, w: Double) => +assert(x === y && w === z) +} +} + } + + test("Multiple Columns: Test numBucketsArray") { +val spark = this.spark +import
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r157156113 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +147,258 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets == expectedNumBucket, +s"Observed number of buckets are not correct." + + s" Expected $expectedNumBucket but found ($observedNumBuckets") +} + } + + test("Multiple Columns: Test transform on data with NaN value") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 3 +val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0) +val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0) + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) + +withClue("QuantileDiscretizer with handleInvalid=error should throw exception for NaN values") { + val dataFrame: DataFrame = validData1.zip(validData2).toSeq.toDF("input1", "input2") + intercept[SparkException] { +discretizer.fit(dataFrame).transform(dataFrame).collect() + } +} + +List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, expectedSkip2)).foreach { + case (u, v, w) => +discretizer.setHandleInvalid(u) +val dataFrame: DataFrame = validData1.zip(validData2).zip(v).zip(w).map { + case (((a, b), c), d) => (a, b, c, d) +}.toSeq.toDF("input1", "input2", "expected1", "expected2") +dataFrame.show --- End diff -- remove the `show` call here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r157156164 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +147,258 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val df = data1.zip(data2).toSeq.toDF("input1", "input2") +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +val result = discretizer.fit(df).transform(df) +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets == expectedNumBucket, +s"Observed number of buckets are not correct." + + s" Expected $expectedNumBucket but found ($observedNumBuckets") +} + } + + test("Multiple Columns: Test transform on data with NaN value") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 3 +val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0) +val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0) + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) + +withClue("QuantileDiscretizer with handleInvalid=error should throw exception for NaN values") { + val dataFrame: DataFrame = validData1.zip(validData2).toSeq.toDF("input1", "input2") + intercept[SparkException] { +discretizer.fit(dataFrame).transform(dataFrame).collect() + } +} + +List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, expectedSkip2)).foreach { + case (u, v, w) => +discretizer.setHandleInvalid(u) +val dataFrame: DataFrame = validData1.zip(validData2).zip(v).zip(w).map { + case (((a, b), c), d) => (a, b, c, d) +}.toSeq.toDF("input1", "input2", "expected1", "expected2") +dataFrame.show +val result = discretizer.fit(dataFrame).transform(dataFrame) +result.show --- End diff -- here too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r157158481 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +155,96 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def getInOutCols: (Array[String], Array[String]) = { +require((isSet(inputCol) && isSet(outputCol) && !isSet(inputCols) && !isSet(outputCols)) || + (!isSet(inputCol) && !isSet(outputCol) && isSet(inputCols) && isSet(outputCols)), + "QuantileDiscretizer only supports setting either inputCol/outputCol or" + +"inputCols/outputCols." +) + +if (isSet(inputCol)) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, --- End diff -- We should add a small test case for mismatched sizes of `inputCols` / `outputCols`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r156442207 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -107,11 +107,11 @@ private[feature] trait QuantileDiscretizerBase extends Params * possible that the number of buckets used will be smaller than this value, for example, if there * are too few distinct values of the input to create enough distinct quantiles. * Since 2.3.0, - * `QuantileDiscretizer ` can map multiple columns at once by setting the `inputCols` parameter. - * Note that when both the `inputCol` and `inputCols` parameters are set, a log warning will be - * printed and only `inputCol` will take effect, while `inputCols` will be ignored. To specify - * the number of bucketsfor each column , the `numBucketsArray ` parameter can be set, or if the - * number of buckets should be the same across columns, `numBuckets` can be set as a convenience. + * `QuantileDiscretizer` can map multiple columns at once by setting the `inputCols` parameter. + * Note that only one of `inputCol` and `inputCols` parameters can be set. If both of the + * `inputCol` and `inputCols` parameters are set, an Exception will be thrown. To specify the --- End diff -- @MLnick Thank you very much for your comments. I will change these. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r156356881 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -107,11 +107,11 @@ private[feature] trait QuantileDiscretizerBase extends Params * possible that the number of buckets used will be smaller than this value, for example, if there * are too few distinct values of the input to create enough distinct quantiles. * Since 2.3.0, - * `QuantileDiscretizer ` can map multiple columns at once by setting the `inputCols` parameter. - * Note that when both the `inputCol` and `inputCols` parameters are set, a log warning will be - * printed and only `inputCol` will take effect, while `inputCols` will be ignored. To specify - * the number of bucketsfor each column , the `numBucketsArray ` parameter can be set, or if the - * number of buckets should be the same across columns, `numBuckets` can be set as a convenience. + * `QuantileDiscretizer` can map multiple columns at once by setting the `inputCols` parameter. + * Note that only one of `inputCol` and `inputCols` parameters can be set. If both of the + * `inputCol` and `inputCols` parameters are set, an Exception will be thrown. To specify the --- End diff -- Think we can simplify to "If both `inputCol` and `inputCols` are set, ..." (since we already said in the previous sentence that only one of the parameters can be set) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r156323615 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -168,20 +168,13 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.3.0") def setOutputCols(value: Array[String]): this.type = set(outputCols, value) - private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { -if (isSet(inputCols) && isSet(inputCol)) { - logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + -"`QuantileDiscretizer` will only map one column specified by `inputCol`") - false -} else if (isSet(inputCols)) { - true -} else { - false -} - } - private[feature] def getInOutCols: (Array[String], Array[String]) = { -if (!isQuantileDiscretizeMultipleColumns) { +require((isSet(inputCol) && isSet(outputCol) && !isSet(inputCols) && !isSet(outputCols)) || + (!isSet(inputCol) && !isSet(outputCol) && isSet(inputCols) && isSet(outputCols)), + "Only allow to set either inputCol/outputCol, or inputCols/outputCols" --- End diff -- I think a better message is something like `QuantileDiscretizer only supports setting either ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r156160532 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +156,102 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + --- End diff -- @WeichenXu123 I will change to throw Exception. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r156010806 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +156,102 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + --- End diff -- According to the discussion result at JIRA SPARK-8418, we should throw exception when both inputCol and inputCols are specified ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r155744727 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +156,106 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + +"`QuantileDiscretizer` will only map one column specified by `inputCol`") + false +} else if (isSet(inputCols)) { + true +} else { + false +} + } + + private[feature] def getInOutCols: (Array[String], Array[String]) = { +if (!isQuantileDiscretizeMultipleColumns) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"inputCols number do not match outputCols") + ($(inputCols), $(outputCols)) +} + } + @Since("1.6.0") override def transformSchema(schema: StructType): StructType = { -SchemaUtils.checkNumericType(schema, $(inputCol)) -val inputFields = schema.fields -require(inputFields.forall(_.name != $(outputCol)), - s"Output column ${$(outputCol)} already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() +val (inputColNames, outputColNames) = getInOutCols +val existingFields = schema.fields +var outputFields = existingFields +inputColNames.zip(outputColNames).map { case (inputColName, outputColName) => --- End diff -- `map` can be `foreach` because there's no return value --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r154571718 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -105,9 +107,11 @@ private[feature] trait QuantileDiscretizerBase extends Params * possible that the number of buckets used will be smaller than this value, for example, if there * are too few distinct values of the input to create enough distinct quantiles. * Since 2.3.0, - * `QuantileDiscretizer` can also map multiple columns at once. Whether it goes to map a column or - * multiple columns, it depends on which parameter of `inputCol` and `inputCols` is set. When both - * are set, a log warning will be printed and by default it chooses `inputCol`. + * `QuantileDiscretizer ` can map multiple columns at once by setting the `inputCols` parameter. + * Note that when both the `inputCol` and `inputCols` parameters are set, a log warning will be + * printed and only `inputCol` will take effect, while `inputCols` will be ignored. To specify + * the number of bucketsfor each column , the `numBucketsArray ` parameter can be set, or if the --- End diff -- "bucketsfor" -> "buckets for" and remove the leading space from " number of buckets ..." on next line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r155759105 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +156,106 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + +"`QuantileDiscretizer` will only map one column specified by `inputCol`") + false +} else if (isSet(inputCols)) { + true +} else { + false +} + } + + private[feature] def getInOutCols: (Array[String], Array[String]) = { +if (!isQuantileDiscretizeMultipleColumns) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"inputCols number do not match outputCols") + ($(inputCols), $(outputCols)) +} + } + @Since("1.6.0") override def transformSchema(schema: StructType): StructType = { -SchemaUtils.checkNumericType(schema, $(inputCol)) -val inputFields = schema.fields -require(inputFields.forall(_.name != $(outputCol)), - s"Output column ${$(outputCol)} already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() +val (inputColNames, outputColNames) = getInOutCols +val existingFields = schema.fields +var outputFields = existingFields +inputColNames.zip(outputColNames).map { case (inputColName, outputColName) => + SchemaUtils.checkNumericType(schema, inputColName) + require(existingFields.forall(_.name != outputColName), +s"Output column ${outputColName} already exists.") + val attr = NominalAttribute.defaultAttr.withName(outputColName) + outputFields :+= attr.toStructField() +} StructType(outputFields) } @Since("2.0.0") override def fit(dataset: Dataset[_]): Bucketizer = { transformSchema(dataset.schema, logging = true) -val splits = dataset.stat.approxQuantile($(inputCol), - (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError)) +val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid)) --- End diff -- Looking at this now, the `Array.fill` approach probably adds needless complexity. But the multi-buckets case can perhaps still be cleaned up. How about something like this: ```scala override def fit(dataset: Dataset[_]): Bucketizer = { transformSchema(dataset.schema, logging = true) val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid)) if (isQuantileDiscretizeMultipleColumns) { val splitsArray = if (isSet(numBucketsArray)) { val probArrayPerCol = $(numBucketsArray).map { numOfBuckets => (0.0 to 1.0 by 1.0 / numOfBuckets).toArray } val probabilityArray = probArrayPerCol.flatten.sorted.distinct val splitsArrayRaw = dataset.stat.approxQuantile($(inputCols), probabilityArray, $(relativeError)) splitsArrayRaw.zip(probArrayPerCol).map { case (splits, probs) => val probSet = probs.toSet val idxSet = probabilityArray.zipWithIndex.collect { case (p, idx) if probSet(p) => idx }.toSet splits.zipWithIndex.collect { case (s, idx) if idxSet(idx) => s } } } else { dataset.stat.approxQuantile($(inputCols), (0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError)) } bucketizer.setSplitsArray(splitsArray.map(getDistinctSplits)) } else { val splits = dataset.stat.approxQuantile($(inputCol), (0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError)) bucketizer.setSplits(getDistinctSplits(splits)) } copyValues(bucketizer.setParent(this)) } ``` Then we don't
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r154133181 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +152,119 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + +"`QuantileDiscretize` only map one column specified by `inputCol`") + false +} else if (isSet(inputCols)) { + true +} else { + false +} + } + + private[feature] def getInOutCols: (Array[String], Array[String]) = { +if (!isQuantileDiscretizeMultipleColumns) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"inputCols number do not match outputCols") + ($(inputCols), $(outputCols)) +} + } + @Since("1.6.0") override def transformSchema(schema: StructType): StructType = { -SchemaUtils.checkNumericType(schema, $(inputCol)) -val inputFields = schema.fields -require(inputFields.forall(_.name != $(outputCol)), - s"Output column ${$(outputCol)} already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() +val (inputColNames, outputColNames) = getInOutCols +val existingFields = schema.fields +var outputFields = existingFields +inputColNames.zip(outputColNames).map { case (inputColName, outputColName) => + SchemaUtils.checkNumericType(schema, inputColName) + require(existingFields.forall(_.name != outputColName), +s"Output column ${outputColName} already exists.") + val attr = NominalAttribute.defaultAttr.withName(outputColName) + outputFields :+= attr.toStructField() +} StructType(outputFields) } @Since("2.0.0") override def fit(dataset: Dataset[_]): Bucketizer = { transformSchema(dataset.schema, logging = true) -val splits = dataset.stat.approxQuantile($(inputCol), - (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError)) +val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid)) +if (isQuantileDiscretizeMultipleColumns) { --- End diff -- @MLnick Thank you very much for your comments. I have two sets of if/else clause. Do you mean to combine the code paths for multiple columns and single column? Or combine the code paths for numBucketsArray and numBuckets? Or both of them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r153774332 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -50,10 +50,26 @@ private[feature] trait QuantileDiscretizerBase extends Params /** @group getParam */ def getNumBuckets: Int = getOrDefault(numBuckets) + /** + * Array of number of buckets (quantiles, or categories) into which data points are grouped. + * + * See also [[handleInvalid]], which can optionally create an additional bucket for NaN values. + * + * @group param + */ + val numBucketsArray = new IntArrayParam(this, "numBucketsArray", "Array of number of buckets " + +"(quantiles, or categories) into which data points are grouped. This is for multiple " + +"columns input. If numBucketsArray is not set but numBuckets is set, it means user wants " + --- End diff -- "If transforming multiple columns and numBucketsArray is not set, but numBuckets is set, then numBuckets will be applied across all columns." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r153776672 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +152,119 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + +"`QuantileDiscretize` only map one column specified by `inputCol`") + false +} else if (isSet(inputCols)) { + true +} else { + false +} + } + + private[feature] def getInOutCols: (Array[String], Array[String]) = { +if (!isQuantileDiscretizeMultipleColumns) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"inputCols number do not match outputCols") + ($(inputCols), $(outputCols)) +} + } + @Since("1.6.0") override def transformSchema(schema: StructType): StructType = { -SchemaUtils.checkNumericType(schema, $(inputCol)) -val inputFields = schema.fields -require(inputFields.forall(_.name != $(outputCol)), - s"Output column ${$(outputCol)} already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() +val (inputColNames, outputColNames) = getInOutCols +val existingFields = schema.fields +var outputFields = existingFields +inputColNames.zip(outputColNames).map { case (inputColName, outputColName) => + SchemaUtils.checkNumericType(schema, inputColName) + require(existingFields.forall(_.name != outputColName), +s"Output column ${outputColName} already exists.") + val attr = NominalAttribute.defaultAttr.withName(outputColName) + outputFields :+= attr.toStructField() +} StructType(outputFields) } @Since("2.0.0") override def fit(dataset: Dataset[_]): Bucketizer = { transformSchema(dataset.schema, logging = true) -val splits = dataset.stat.approxQuantile($(inputCol), - (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError)) +val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid)) +if (isQuantileDiscretizeMultipleColumns) { + var bucketArray = Array.empty[Int] --- End diff -- ```scala val bucketSeq = if (isSet(numBucketsArray)) { $(numBucketsArray).toSeq } else { Seq($(numBuckets)) } --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r153786414 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,166 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + --- End diff -- We should add 2 tests: 1. test setting `numBuckets` is the same as setting `numBucketsArray` explicitly with identical values 2. test that QD over multiple columns produces the same results as 2x QDs over the same columns (as we did for Bucketizer) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r153774170 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -50,10 +50,26 @@ private[feature] trait QuantileDiscretizerBase extends Params /** @group getParam */ def getNumBuckets: Int = getOrDefault(numBuckets) + /** + * Array of number of buckets (quantiles, or categories) into which data points are grouped. + * + * See also [[handleInvalid]], which can optionally create an additional bucket for NaN values. + * + * @group param + */ + val numBucketsArray = new IntArrayParam(this, "numBucketsArray", "Array of number of buckets " + +"(quantiles, or categories) into which data points are grouped. This is for multiple " + +"columns input. If numBucketsArray is not set but numBuckets is set, it means user wants " + +"to use the same numBuckets across all columns.") --- End diff -- Need a validator function here to ensure all bucket values >= 2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r153775090 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -86,6 +104,10 @@ private[feature] trait QuantileDiscretizerBase extends Params * categorical features. The number of bins can be set using the `numBuckets` parameter. It is * possible that the number of buckets used will be smaller than this value, for example, if there * are too few distinct values of the input to create enough distinct quantiles. + * Since 2.3.0, --- End diff -- Let's match the Bucketizer comment. So something like: ``` ... Since 2.3.0, `QuantileDiscretizer ` can map multiple columns at once by setting the `inputCols` parameter. Note that when both the `inputCol` and `inputCols` parameters are set, a log warning will be printed and only `inputCol` will take effect, while `inputCols` will be ignored. To specify the number of buckets for each column , the `numBucketsArray ` parameter can be set, or if the number of buckets should be the same across columns, `numBuckets` can be set as a convenience. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r153785692 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +152,119 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + +"`QuantileDiscretize` only map one column specified by `inputCol`") + false +} else if (isSet(inputCols)) { + true +} else { + false +} + } + + private[feature] def getInOutCols: (Array[String], Array[String]) = { +if (!isQuantileDiscretizeMultipleColumns) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"inputCols number do not match outputCols") + ($(inputCols), $(outputCols)) +} + } + @Since("1.6.0") override def transformSchema(schema: StructType): StructType = { -SchemaUtils.checkNumericType(schema, $(inputCol)) -val inputFields = schema.fields -require(inputFields.forall(_.name != $(outputCol)), - s"Output column ${$(outputCol)} already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() +val (inputColNames, outputColNames) = getInOutCols +val existingFields = schema.fields +var outputFields = existingFields +inputColNames.zip(outputColNames).map { case (inputColName, outputColName) => + SchemaUtils.checkNumericType(schema, inputColName) + require(existingFields.forall(_.name != outputColName), +s"Output column ${outputColName} already exists.") + val attr = NominalAttribute.defaultAttr.withName(outputColName) + outputFields :+= attr.toStructField() +} StructType(outputFields) } @Since("2.0.0") override def fit(dataset: Dataset[_]): Bucketizer = { transformSchema(dataset.schema, logging = true) -val splits = dataset.stat.approxQuantile($(inputCol), - (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError)) +val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid)) +if (isQuantileDiscretizeMultipleColumns) { --- End diff -- This section overall seems like it can be cleaned up - it should be possible to have one code path for a Seq of numBuckets and at the end if transforming only one column the splits array should be the first element. You could check the case of a single `numBuckets` value and `Array.fill` that value (if `numBucketsArray` is not set). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r153772861 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +152,119 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + +"`QuantileDiscretize` only map one column specified by `inputCol`") --- End diff -- 'only map' -> 'will only map' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r153775451 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -50,10 +50,26 @@ private[feature] trait QuantileDiscretizerBase extends Params /** @group getParam */ def getNumBuckets: Int = getOrDefault(numBuckets) + /** + * Array of number of buckets (quantiles, or categories) into which data points are grouped. --- End diff -- Can add a comment about "each value must be greater than or equal to 2" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user MLnick commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r153772930 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +152,119 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + +"`QuantileDiscretize` only map one column specified by `inputCol`") --- End diff -- `QuantileDiscretize` -> `QuantileDiscretizer` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150518031 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val data = (0 until 10).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val data = (0 until data1.length).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) +val result = discretizer.fit(df).transform(df) +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets == expectedNumBucket, +s"Observed number of buckets are not correct." + + s" Expected $expectedNumBucket but found ($observedNumBuckets") +} + } + + test("Multiple Columns: Test transform on data with NaN value") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 3 +val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, Double.NaN, Double.NaN) +val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0) + +val data = (0 until validData1.length).map { idx => + (validData1(idx), validData2(idx), expectedKeep1(idx), expectedKeep2(idx)) +} +val dataFrame: DataFrame = data.toSeq.toDF("input1", "input2", "expected1", "expected2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) + +withClue("QuantileDiscretizer with handleInvalid=error should throw exception for NaN values") { + intercept[SparkException] { +discretizer.fit(dataFrame).transform(dataFrame).collect() + } +} + +discretizer.setHandleInvalid("keep") +discretizer.fit(dataFrame).transform(dataFrame). + select("result1", "expected1", "result2", "expected2") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double) => +assert(r1 === e1, + s"The result value is not correct after bucketing. Expected $e1 but found $r1") +assert(r2 === e2, + s"The result value is
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150451065 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val data = (0 until 10).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val data = (0 until data1.length).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) +val result = discretizer.fit(df).transform(df) +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets == expectedNumBucket, +s"Observed number of buckets are not correct." + + s" Expected $expectedNumBucket but found ($observedNumBuckets") +} + } + + test("Multiple Columns: Test transform on data with NaN value") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 3 +val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, Double.NaN, Double.NaN) +val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0) + +val data = (0 until validData1.length).map { idx => + (validData1(idx), validData2(idx), expectedKeep1(idx), expectedKeep2(idx)) +} +val dataFrame: DataFrame = data.toSeq.toDF("input1", "input2", "expected1", "expected2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) + +withClue("QuantileDiscretizer with handleInvalid=error should throw exception for NaN values") { + intercept[SparkException] { +discretizer.fit(dataFrame).transform(dataFrame).collect() + } +} + +discretizer.setHandleInvalid("keep") +discretizer.fit(dataFrame).transform(dataFrame). + select("result1", "expected1", "result2", "expected2") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double) => +assert(r1 === e1, + s"The result value is not correct after bucketing. Expected $e1 but found $r1") +assert(r2 === e2, + s"The result value
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150450334 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val data = (0 until 10).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val data = (0 until data1.length).map { idx => + (data1(idx), data2(idx)) +} --- End diff -- Will change to data1.zip(data2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150450305 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val data = (0 until 10).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") --- End diff -- Will remove DataFrame --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150450319 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val data = (0 until 10).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val data = (0 until data1.length).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") --- End diff -- Will remove DataFrame. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150450280 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val data = (0 until 10).map { idx => + (data1(idx), data2(idx)) +} --- End diff -- Yes. Will change to data1.zip(data2) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150450222 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +152,95 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + +"`QuantileDiscretize` only map one column specified by `inputCol`") + false +} else if (isSet(inputCols)) { + true +} else { + false +} + } + + private[feature] def getInOutCols: (Array[String], Array[String]) = { +if (!isQuantileDiscretizeMultipleColumns) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"inputCols number do not match outputCols") + ($(inputCols), $(outputCols)) +} + } + @Since("1.6.0") override def transformSchema(schema: StructType): StructType = { -SchemaUtils.checkNumericType(schema, $(inputCol)) -val inputFields = schema.fields -require(inputFields.forall(_.name != $(outputCol)), - s"Output column ${$(outputCol)} already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() +val (inputColNames, outputColNames) = getInOutCols +val existingFields = schema.fields +var outputFields = existingFields +inputColNames.zip(outputColNames).map { case (inputColName, outputColName) => + SchemaUtils.checkNumericType(schema, inputColName) + require(existingFields.forall(_.name != outputColName), +s"Output column ${outputColName} already exists.") + val attr = NominalAttribute.defaultAttr.withName(outputColName) + outputFields :+= attr.toStructField() +} StructType(outputFields) } @Since("2.0.0") override def fit(dataset: Dataset[_]): Bucketizer = { transformSchema(dataset.schema, logging = true) -val splits = dataset.stat.approxQuantile($(inputCol), - (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError)) +val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid)) +if (isQuantileDiscretizeMultipleColumns) { + var bucketArray = Array.empty[Int] + if (isSet(numBucketsArray)) { +bucketArray = $(numBucketsArray) + } + else { +bucketArray = Array($(numBuckets)) + } + val probabilityArray = bucketArray.toSeq.flatMap { numOfBucket => +(0.0 to 1.0 by 1.0 / numOfBucket) + } + val splitsArray = dataset.stat.approxQuantile($(inputCols), +probabilityArray.sorted.toArray.distinct, $(relativeError)) + val distinctSplitsArray = splitsArray.toSeq.map { splits => +getDistinctSplits(splits) + } + bucketizer.setSplitsArray(distinctSplitsArray.toArray) + copyValues(bucketizer.setParent(this)) +} +else { --- End diff -- Will fix this. And fix the same problem in another place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150450151 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -104,7 +126,8 @@ private[feature] trait QuantileDiscretizerBase extends Params */ @Since("1.6.0") final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val uid: String) - extends Estimator[Bucketizer] with QuantileDiscretizerBase with DefaultParamsWritable { + extends Estimator[Bucketizer] with QuantileDiscretizerBase with DefaultParamsWritable +with HasInputCols with HasOutputCols { --- End diff -- I guess I will leave this as is even though it's a bit weird. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150401598 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -104,7 +126,8 @@ private[feature] trait QuantileDiscretizerBase extends Params */ @Since("1.6.0") final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val uid: String) - extends Estimator[Bucketizer] with QuantileDiscretizerBase with DefaultParamsWritable { + extends Estimator[Bucketizer] with QuantileDiscretizerBase with DefaultParamsWritable +with HasInputCols with HasOutputCols { --- End diff -- It looks a bit weird to have `HasInputCols` and `HasOutputCols` directly in `QuantileDiscretizer` and leave other params in `QuantileDiscretizerBase`. But extending `HasInputCols` and `HasOutputCols` in `QuantileDiscretizerBase` causes binary compatibility issue. I think we don't want to break the compatibility in the effort of adding multi-col support. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150401672 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +152,95 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + +"`QuantileDiscretize` only map one column specified by `inputCol`") + false +} else if (isSet(inputCols)) { + true +} else { + false +} + } + + private[feature] def getInOutCols: (Array[String], Array[String]) = { +if (!isQuantileDiscretizeMultipleColumns) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"inputCols number do not match outputCols") + ($(inputCols), $(outputCols)) +} + } + @Since("1.6.0") override def transformSchema(schema: StructType): StructType = { -SchemaUtils.checkNumericType(schema, $(inputCol)) -val inputFields = schema.fields -require(inputFields.forall(_.name != $(outputCol)), - s"Output column ${$(outputCol)} already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() +val (inputColNames, outputColNames) = getInOutCols +val existingFields = schema.fields +var outputFields = existingFields +inputColNames.zip(outputColNames).map { case (inputColName, outputColName) => + SchemaUtils.checkNumericType(schema, inputColName) + require(existingFields.forall(_.name != outputColName), +s"Output column ${outputColName} already exists.") + val attr = NominalAttribute.defaultAttr.withName(outputColName) + outputFields :+= attr.toStructField() +} StructType(outputFields) } @Since("2.0.0") override def fit(dataset: Dataset[_]): Bucketizer = { transformSchema(dataset.schema, logging = true) -val splits = dataset.stat.approxQuantile($(inputCol), - (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError)) +val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid)) +if (isQuantileDiscretizeMultipleColumns) { + var bucketArray = Array.empty[Int] + if (isSet(numBucketsArray)) { +bucketArray = $(numBucketsArray) + } + else { +bucketArray = Array($(numBuckets)) + } + val probabilityArray = bucketArray.toSeq.flatMap { numOfBucket => +(0.0 to 1.0 by 1.0 / numOfBucket) + } + val splitsArray = dataset.stat.approxQuantile($(inputCols), +probabilityArray.sorted.toArray.distinct, $(relativeError)) + val distinctSplitsArray = splitsArray.toSeq.map { splits => +getDistinctSplits(splits) + } + bucketizer.setSplitsArray(distinctSplitsArray.toArray) + copyValues(bucketizer.setParent(this)) +} +else { --- End diff -- style issue: ```scala ... } else { ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150401768 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val data = (0 until 10).map { idx => + (data1(idx), data2(idx)) +} --- End diff -- `val data` seems just as `data1.zip(data2)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150401807 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val data = (0 until 10).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val data = (0 until data1.length).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") --- End diff -- nit: Remove `DataFrame`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150404292 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val data = (0 until 10).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val data = (0 until data1.length).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) +val result = discretizer.fit(df).transform(df) +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets == expectedNumBucket, +s"Observed number of buckets are not correct." + + s" Expected $expectedNumBucket but found ($observedNumBuckets") +} + } + + test("Multiple Columns: Test transform on data with NaN value") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 3 +val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) +val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0) +val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, Double.NaN, Double.NaN) +val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0) + +val data = (0 until validData1.length).map { idx => + (validData1(idx), validData2(idx), expectedKeep1(idx), expectedKeep2(idx)) +} +val dataFrame: DataFrame = data.toSeq.toDF("input1", "input2", "expected1", "expected2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) + +withClue("QuantileDiscretizer with handleInvalid=error should throw exception for NaN values") { + intercept[SparkException] { +discretizer.fit(dataFrame).transform(dataFrame).collect() + } +} + +discretizer.setHandleInvalid("keep") +discretizer.fit(dataFrame).transform(dataFrame). + select("result1", "expected1", "result2", "expected2") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double) => +assert(r1 === e1, + s"The result value is not correct after bucketing. Expected $e1 but found $r1") +assert(r2 === e2, + s"The result value is
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150401744 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val data = (0 until 10).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") --- End diff -- nit: No need for the explicit type `DataFrame`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r150401804 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite val model = discretizer.fit(df) assert(model.hasParent) } + + test("Multiple Columns: Test observed number of buckets and their sizes match expected values") { +val spark = this.spark +import spark.implicits._ + +val datasetSize = 10 +val numBuckets = 5 +val data1 = Array.range(1, 11, 1).map(_.toDouble) +val data2 = Array.range(1, 20, 2).map(_.toDouble) +val data = (0 until 10).map { idx => + (data1(idx), data2(idx)) +} +val df: DataFrame = data.toSeq.toDF("input1", "input2") + +val discretizer = new QuantileDiscretizer() + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("result1", "result2")) + .setNumBuckets(numBuckets) +assert(discretizer.isQuantileDiscretizeMultipleColumns()) +val result = discretizer.fit(df).transform(df) + +val relativeError = discretizer.getRelativeError +val isGoodBucket = udf { + (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= (relativeError * datasetSize) +} + +for (i <- 1 to 2) { + val observedNumBuckets = result.select("result" + i).distinct.count + assert(observedNumBuckets === numBuckets, +"Observed number of buckets does not equal expected number of buckets.") + + val numGoodBuckets = result.groupBy("result" + i).count.filter(isGoodBucket($"count")).count + assert(numGoodBuckets === numBuckets, +"Bucket sizes are not within expected relative error tolerance.") +} + } + + test("Multiple Columns: Test on data with high proportion of duplicated values") { +val spark = this.spark +import spark.implicits._ + +val numBuckets = 5 +val expectedNumBucket = 3 +val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0) +val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 1.0, 2.0) +val data = (0 until data1.length).map { idx => + (data1(idx), data2(idx)) +} --- End diff -- Use `data1.zip(data2)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
GitHub user huaxingao opened a pull request: https://github.com/apache/spark/pull/19715 [SPARK-22397][ML]add multiple columns support to QuantileDiscretizer ## What changes were proposed in this pull request? add multi columns support to QuantileDiscretizer ## How was this patch tested? add UT in QuantileDiscretizerSuite to test multi columns supports You can merge this pull request into a Git repository by running: $ git pull https://github.com/huaxingao/spark spark_22397 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19715.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19715 commit 07bd868956e8d63294b2acb0b5d01a7ca2b35866 Author: Huaxin GaoDate: 2017-11-10T06:57:04Z [SPARK-22397][ML]add multiple columns support to QuantileDiscretizer --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org