[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-31 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19715


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-21 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r158347717
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -386,19 +382,16 @@ class QuantileDiscretizerSuite
 testDefaultReadWrite(discretizer)
   }
 
-  test("Both inputCol and inputCols are set") {
-val spark = this.spark
-import spark.implicits._
-val discretizer = new QuantileDiscretizer()
-  .setInputCol("input")
-  .setOutputCol("result")
-  .setNumBuckets(3)
-  .setInputCols(Array("input1", "input2"))
-val df = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
-  .map(Tuple1.apply).toDF("input")
-// When both inputCol and inputCols are set, we throw Exception.
-intercept[Exception] {
-  discretizer.fit(df)
+  test("multiple columns: Both inputCol and inputCols are set") {
+intercept[IllegalArgumentException] {
+  new 
QuantileDiscretizer().setInputCol("in").setInputCols(Array("in1", 
"in2")).getInOutCols
--- End diff --

Thanks for spending so much time to review this PR. I will change this. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-21 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r158239692
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -386,19 +382,16 @@ class QuantileDiscretizerSuite
 testDefaultReadWrite(discretizer)
   }
 
-  test("Both inputCol and inputCols are set") {
-val spark = this.spark
-import spark.implicits._
-val discretizer = new QuantileDiscretizer()
-  .setInputCol("input")
-  .setOutputCol("result")
-  .setNumBuckets(3)
-  .setInputCols(Array("input1", "input2"))
-val df = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
-  .map(Tuple1.apply).toDF("input")
-// When both inputCol and inputCols are set, we throw Exception.
-intercept[Exception] {
-  discretizer.fit(df)
+  test("multiple columns: Both inputCol and inputCols are set") {
+intercept[IllegalArgumentException] {
+  new 
QuantileDiscretizer().setInputCol("in").setInputCols(Array("in1", 
"in2")).getInOutCols
--- End diff --

I think I slightly prefer to actually test that the error is thrown during 
`transform`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-15 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r157282154
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +147,258 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets == expectedNumBucket,
+s"Observed number of buckets are not correct." +
+  s" Expected $expectedNumBucket but found ($observedNumBuckets")
+}
+  }
+
+  test("Multiple Columns: Test transform on data with NaN value") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 3
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, 
Double.NaN, Double.NaN)
+val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0)
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+
+withClue("QuantileDiscretizer with handleInvalid=error should throw 
exception for NaN values") {
+  val dataFrame: DataFrame = 
validData1.zip(validData2).toSeq.toDF("input1", "input2")
+  intercept[SparkException] {
+discretizer.fit(dataFrame).transform(dataFrame).collect()
+  }
+}
+
+List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, 
expectedSkip2)).foreach {
+  case (u, v, w) =>
+discretizer.setHandleInvalid(u)
+val dataFrame: DataFrame = 
validData1.zip(validData2).zip(v).zip(w).map {
+  case (((a, b), c), d) => (a, b, c, d)
+}.toSeq.toDF("input1", "input2", "expected1", "expected2")
+dataFrame.show
+val result = discretizer.fit(dataFrame).transform(dataFrame)
+result.show
+result.select("result1", "expected1", "result2", 
"expected2").collect().foreach {
+  case Row(x: Double, y: Double, z: Double, w: Double) =>
+assert(x === y && w === z)
+}
+}
+  }
+
+  test("Multiple Columns: Test numBucketsArray") {
+val spark = this.spark
+import 

[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-15 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r157156875
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +147,258 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets == expectedNumBucket,
+s"Observed number of buckets are not correct." +
+  s" Expected $expectedNumBucket but found ($observedNumBuckets")
+}
+  }
+
+  test("Multiple Columns: Test transform on data with NaN value") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 3
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, 
Double.NaN, Double.NaN)
+val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0)
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+
+withClue("QuantileDiscretizer with handleInvalid=error should throw 
exception for NaN values") {
+  val dataFrame: DataFrame = 
validData1.zip(validData2).toSeq.toDF("input1", "input2")
+  intercept[SparkException] {
+discretizer.fit(dataFrame).transform(dataFrame).collect()
+  }
+}
+
+List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, 
expectedSkip2)).foreach {
+  case (u, v, w) =>
+discretizer.setHandleInvalid(u)
+val dataFrame: DataFrame = 
validData1.zip(validData2).zip(v).zip(w).map {
+  case (((a, b), c), d) => (a, b, c, d)
+}.toSeq.toDF("input1", "input2", "expected1", "expected2")
+dataFrame.show
+val result = discretizer.fit(dataFrame).transform(dataFrame)
+result.show
+result.select("result1", "expected1", "result2", 
"expected2").collect().foreach {
+  case Row(x: Double, y: Double, z: Double, w: Double) =>
+assert(x === y && w === z)
+}
+}
+  }
+
+  test("Multiple Columns: Test numBucketsArray") {
+val spark = this.spark
+import 

[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-15 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r157156926
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +147,258 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets == expectedNumBucket,
+s"Observed number of buckets are not correct." +
+  s" Expected $expectedNumBucket but found ($observedNumBuckets")
+}
+  }
+
+  test("Multiple Columns: Test transform on data with NaN value") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 3
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, 
Double.NaN, Double.NaN)
+val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0)
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+
+withClue("QuantileDiscretizer with handleInvalid=error should throw 
exception for NaN values") {
+  val dataFrame: DataFrame = 
validData1.zip(validData2).toSeq.toDF("input1", "input2")
+  intercept[SparkException] {
+discretizer.fit(dataFrame).transform(dataFrame).collect()
+  }
+}
+
+List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, 
expectedSkip2)).foreach {
+  case (u, v, w) =>
+discretizer.setHandleInvalid(u)
+val dataFrame: DataFrame = 
validData1.zip(validData2).zip(v).zip(w).map {
+  case (((a, b), c), d) => (a, b, c, d)
+}.toSeq.toDF("input1", "input2", "expected1", "expected2")
+dataFrame.show
+val result = discretizer.fit(dataFrame).transform(dataFrame)
+result.show
+result.select("result1", "expected1", "result2", 
"expected2").collect().foreach {
+  case Row(x: Double, y: Double, z: Double, w: Double) =>
+assert(x === y && w === z)
+}
+}
+  }
+
+  test("Multiple Columns: Test numBucketsArray") {
+val spark = this.spark
+import 

[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-15 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r157158204
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +147,258 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets == expectedNumBucket,
+s"Observed number of buckets are not correct." +
+  s" Expected $expectedNumBucket but found ($observedNumBuckets")
+}
+  }
+
+  test("Multiple Columns: Test transform on data with NaN value") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 3
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, 
Double.NaN, Double.NaN)
+val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0)
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+
+withClue("QuantileDiscretizer with handleInvalid=error should throw 
exception for NaN values") {
+  val dataFrame: DataFrame = 
validData1.zip(validData2).toSeq.toDF("input1", "input2")
+  intercept[SparkException] {
+discretizer.fit(dataFrame).transform(dataFrame).collect()
+  }
+}
+
+List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, 
expectedSkip2)).foreach {
+  case (u, v, w) =>
+discretizer.setHandleInvalid(u)
+val dataFrame: DataFrame = 
validData1.zip(validData2).zip(v).zip(w).map {
+  case (((a, b), c), d) => (a, b, c, d)
+}.toSeq.toDF("input1", "input2", "expected1", "expected2")
+dataFrame.show
+val result = discretizer.fit(dataFrame).transform(dataFrame)
+result.show
+result.select("result1", "expected1", "result2", 
"expected2").collect().foreach {
+  case Row(x: Double, y: Double, z: Double, w: Double) =>
+assert(x === y && w === z)
+}
+}
+  }
+
+  test("Multiple Columns: Test numBucketsArray") {
+val spark = this.spark
+import 

[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-15 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r157156113
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +147,258 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets == expectedNumBucket,
+s"Observed number of buckets are not correct." +
+  s" Expected $expectedNumBucket but found ($observedNumBuckets")
+}
+  }
+
+  test("Multiple Columns: Test transform on data with NaN value") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 3
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, 
Double.NaN, Double.NaN)
+val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0)
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+
+withClue("QuantileDiscretizer with handleInvalid=error should throw 
exception for NaN values") {
+  val dataFrame: DataFrame = 
validData1.zip(validData2).toSeq.toDF("input1", "input2")
+  intercept[SparkException] {
+discretizer.fit(dataFrame).transform(dataFrame).collect()
+  }
+}
+
+List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, 
expectedSkip2)).foreach {
+  case (u, v, w) =>
+discretizer.setHandleInvalid(u)
+val dataFrame: DataFrame = 
validData1.zip(validData2).zip(v).zip(w).map {
+  case (((a, b), c), d) => (a, b, c, d)
+}.toSeq.toDF("input1", "input2", "expected1", "expected2")
+dataFrame.show
--- End diff --

remove the `show` call here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-15 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r157156164
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +147,258 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val df = data1.zip(data2).toSeq.toDF("input1", "input2")
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+val result = discretizer.fit(df).transform(df)
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets == expectedNumBucket,
+s"Observed number of buckets are not correct." +
+  s" Expected $expectedNumBucket but found ($observedNumBuckets")
+}
+  }
+
+  test("Multiple Columns: Test transform on data with NaN value") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 3
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, Double.NaN, 
Double.NaN, Double.NaN)
+val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val expectedSkip2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0)
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+
+withClue("QuantileDiscretizer with handleInvalid=error should throw 
exception for NaN values") {
+  val dataFrame: DataFrame = 
validData1.zip(validData2).toSeq.toDF("input1", "input2")
+  intercept[SparkException] {
+discretizer.fit(dataFrame).transform(dataFrame).collect()
+  }
+}
+
+List(("keep", expectedKeep1, expectedKeep2), ("skip", expectedSkip1, 
expectedSkip2)).foreach {
+  case (u, v, w) =>
+discretizer.setHandleInvalid(u)
+val dataFrame: DataFrame = 
validData1.zip(validData2).zip(v).zip(w).map {
+  case (((a, b), c), d) => (a, b, c, d)
+}.toSeq.toDF("input1", "input2", "expected1", "expected2")
+dataFrame.show
+val result = discretizer.fit(dataFrame).transform(dataFrame)
+result.show
--- End diff --

here too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-15 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r157158481
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +155,96 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def getInOutCols: (Array[String], Array[String]) = {
+require((isSet(inputCol) && isSet(outputCol) && !isSet(inputCols) && 
!isSet(outputCols)) ||
+  (!isSet(inputCol) && !isSet(outputCol) && isSet(inputCols) && 
isSet(outputCols)),
+  "QuantileDiscretizer only supports setting either inputCol/outputCol 
or" +
+"inputCols/outputCols."
+)
+
+if (isSet(inputCol)) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
--- End diff --

We should add a small test case for mismatched sizes of `inputCols` / 
`outputCols`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-12 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r156442207
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -107,11 +107,11 @@ private[feature] trait QuantileDiscretizerBase 
extends Params
  * possible that the number of buckets used will be smaller than this 
value, for example, if there
  * are too few distinct values of the input to create enough distinct 
quantiles.
  * Since 2.3.0,
- * `QuantileDiscretizer ` can map multiple columns at once by setting the 
`inputCols` parameter.
- * Note that when both the `inputCol` and `inputCols` parameters are set, 
a log warning will be
- * printed and only `inputCol` will take effect, while `inputCols` will be 
ignored. To specify
- * the number of bucketsfor each column , the `numBucketsArray ` parameter 
can be set, or if the
- *  number of buckets should be the same across columns, `numBuckets` can 
be set as a convenience.
+ * `QuantileDiscretizer` can map multiple columns at once by setting the 
`inputCols` parameter.
+ * Note that only one of `inputCol` and `inputCols` parameters can be set. 
If both of the
+ * `inputCol` and `inputCols` parameters are set, an Exception will be 
thrown. To specify the
--- End diff --

@MLnick Thank you very much for your comments. I will change these. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-12 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r156356881
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -107,11 +107,11 @@ private[feature] trait QuantileDiscretizerBase 
extends Params
  * possible that the number of buckets used will be smaller than this 
value, for example, if there
  * are too few distinct values of the input to create enough distinct 
quantiles.
  * Since 2.3.0,
- * `QuantileDiscretizer ` can map multiple columns at once by setting the 
`inputCols` parameter.
- * Note that when both the `inputCol` and `inputCols` parameters are set, 
a log warning will be
- * printed and only `inputCol` will take effect, while `inputCols` will be 
ignored. To specify
- * the number of bucketsfor each column , the `numBucketsArray ` parameter 
can be set, or if the
- *  number of buckets should be the same across columns, `numBuckets` can 
be set as a convenience.
+ * `QuantileDiscretizer` can map multiple columns at once by setting the 
`inputCols` parameter.
+ * Note that only one of `inputCol` and `inputCols` parameters can be set. 
If both of the
+ * `inputCol` and `inputCols` parameters are set, an Exception will be 
thrown. To specify the
--- End diff --

Think we can simplify to "If both `inputCol` and `inputCols` are set, ..." 
(since we already said in the previous sentence that only one of the parameters 
can be set)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-12 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r156323615
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -168,20 +168,13 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.3.0")
   def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
 
-  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
-if (isSet(inputCols) && isSet(inputCol)) {
-  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
-"`QuantileDiscretizer` will only map one column specified by 
`inputCol`")
-  false
-} else if (isSet(inputCols)) {
-  true
-} else {
-  false
-}
-  }
-
   private[feature] def getInOutCols: (Array[String], Array[String]) = {
-if (!isQuantileDiscretizeMultipleColumns) {
+require((isSet(inputCol) && isSet(outputCol) && !isSet(inputCols) && 
!isSet(outputCols)) ||
+  (!isSet(inputCol) && !isSet(outputCol) && isSet(inputCols) && 
isSet(outputCols)),
+  "Only allow to set either inputCol/outputCol, or 
inputCols/outputCols"
--- End diff --

I think a better message is something like `QuantileDiscretizer only 
supports setting either ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-11 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r156160532
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +156,102 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
--- End diff --

@WeichenXu123 I will change to throw Exception. Thanks. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-11 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r156010806
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +156,102 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
--- End diff --

According to the discussion result at JIRA SPARK-8418, we should throw 
exception when both inputCol and inputCols are specified ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-08 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r155744727
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +156,106 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
+"`QuantileDiscretizer` will only map one column specified by 
`inputCol`")
+  false
+} else if (isSet(inputCols)) {
+  true
+} else {
+  false
+}
+  }
+
+  private[feature] def getInOutCols: (Array[String], Array[String]) = {
+if (!isQuantileDiscretizeMultipleColumns) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"inputCols number do not match outputCols")
+  ($(inputCols), $(outputCols))
+}
+  }
+
   @Since("1.6.0")
   override def transformSchema(schema: StructType): StructType = {
-SchemaUtils.checkNumericType(schema, $(inputCol))
-val inputFields = schema.fields
-require(inputFields.forall(_.name != $(outputCol)),
-  s"Output column ${$(outputCol)} already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
+val (inputColNames, outputColNames) = getInOutCols
+val existingFields = schema.fields
+var outputFields = existingFields
+inputColNames.zip(outputColNames).map { case (inputColName, 
outputColName) =>
--- End diff --

`map` can be `foreach` because there's no return value


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-08 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r154571718
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -105,9 +107,11 @@ private[feature] trait QuantileDiscretizerBase extends 
Params
  * possible that the number of buckets used will be smaller than this 
value, for example, if there
  * are too few distinct values of the input to create enough distinct 
quantiles.
  * Since 2.3.0,
- * `QuantileDiscretizer` can also map multiple columns at once. Whether it 
goes to map a column or
- * multiple columns, it depends on which parameter of `inputCol` and 
`inputCols` is set. When both
- * are set, a log warning will be printed and by default it chooses 
`inputCol`.
+ * `QuantileDiscretizer ` can map multiple columns at once by setting the 
`inputCols` parameter.
+ * Note that when both the `inputCol` and `inputCols` parameters are set, 
a log warning will be
+ * printed and only `inputCol` will take effect, while `inputCols` will be 
ignored. To specify
+ * the number of bucketsfor each column , the `numBucketsArray ` parameter 
can be set, or if the
--- End diff --

"bucketsfor" -> "buckets for"

and remove the leading space from " number of buckets ..." on next line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-08 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r155759105
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +156,106 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
+"`QuantileDiscretizer` will only map one column specified by 
`inputCol`")
+  false
+} else if (isSet(inputCols)) {
+  true
+} else {
+  false
+}
+  }
+
+  private[feature] def getInOutCols: (Array[String], Array[String]) = {
+if (!isQuantileDiscretizeMultipleColumns) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"inputCols number do not match outputCols")
+  ($(inputCols), $(outputCols))
+}
+  }
+
   @Since("1.6.0")
   override def transformSchema(schema: StructType): StructType = {
-SchemaUtils.checkNumericType(schema, $(inputCol))
-val inputFields = schema.fields
-require(inputFields.forall(_.name != $(outputCol)),
-  s"Output column ${$(outputCol)} already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
+val (inputColNames, outputColNames) = getInOutCols
+val existingFields = schema.fields
+var outputFields = existingFields
+inputColNames.zip(outputColNames).map { case (inputColName, 
outputColName) =>
+  SchemaUtils.checkNumericType(schema, inputColName)
+  require(existingFields.forall(_.name != outputColName),
+s"Output column ${outputColName} already exists.")
+  val attr = NominalAttribute.defaultAttr.withName(outputColName)
+  outputFields :+= attr.toStructField()
+}
 StructType(outputFields)
   }
 
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): Bucketizer = {
 transformSchema(dataset.schema, logging = true)
-val splits = dataset.stat.approxQuantile($(inputCol),
-  (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError))
+val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid))
--- End diff --

Looking at this now, the `Array.fill` approach probably adds needless 
complexity.

But the multi-buckets case can perhaps still be cleaned up. How about 
something like this:

```scala
  override def fit(dataset: Dataset[_]): Bucketizer = {
transformSchema(dataset.schema, logging = true)
val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid))
if (isQuantileDiscretizeMultipleColumns) {
  val splitsArray = if (isSet(numBucketsArray)) {
val probArrayPerCol = $(numBucketsArray).map { numOfBuckets =>
  (0.0 to 1.0 by 1.0 / numOfBuckets).toArray
}

val probabilityArray = probArrayPerCol.flatten.sorted.distinct
val splitsArrayRaw = dataset.stat.approxQuantile($(inputCols),
  probabilityArray, $(relativeError))

splitsArrayRaw.zip(probArrayPerCol).map { case (splits, probs) =>
  val probSet = probs.toSet
  val idxSet = probabilityArray.zipWithIndex.collect {
case (p, idx) if probSet(p) =>
  idx
  }.toSet
  splits.zipWithIndex.collect {
case (s, idx) if idxSet(idx) =>
  s
  }
}
  } else {
dataset.stat.approxQuantile($(inputCols),
  (0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError))
  }
  bucketizer.setSplitsArray(splitsArray.map(getDistinctSplits))
} else {
  val splits = dataset.stat.approxQuantile($(inputCol),
(0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError))
  bucketizer.setSplits(getDistinctSplits(splits))
}
copyValues(bucketizer.setParent(this))
  }
```

Then we don't 

[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-30 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r154133181
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +152,119 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
+"`QuantileDiscretize` only map one column specified by `inputCol`")
+  false
+} else if (isSet(inputCols)) {
+  true
+} else {
+  false
+}
+  }
+
+  private[feature] def getInOutCols: (Array[String], Array[String]) = {
+if (!isQuantileDiscretizeMultipleColumns) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"inputCols number do not match outputCols")
+  ($(inputCols), $(outputCols))
+}
+  }
+
   @Since("1.6.0")
   override def transformSchema(schema: StructType): StructType = {
-SchemaUtils.checkNumericType(schema, $(inputCol))
-val inputFields = schema.fields
-require(inputFields.forall(_.name != $(outputCol)),
-  s"Output column ${$(outputCol)} already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
+val (inputColNames, outputColNames) = getInOutCols
+val existingFields = schema.fields
+var outputFields = existingFields
+inputColNames.zip(outputColNames).map { case (inputColName, 
outputColName) =>
+  SchemaUtils.checkNumericType(schema, inputColName)
+  require(existingFields.forall(_.name != outputColName),
+s"Output column ${outputColName} already exists.")
+  val attr = NominalAttribute.defaultAttr.withName(outputColName)
+  outputFields :+= attr.toStructField()
+}
 StructType(outputFields)
   }
 
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): Bucketizer = {
 transformSchema(dataset.schema, logging = true)
-val splits = dataset.stat.approxQuantile($(inputCol),
-  (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError))
+val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid))
+if (isQuantileDiscretizeMultipleColumns) {
--- End diff --

@MLnick Thank you very much for your comments.  I have two sets of if/else 
clause. Do you mean to combine the code paths for multiple columns and single 
column? Or combine the code paths for numBucketsArray and numBuckets? Or both 
of them? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-29 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r153774332
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -50,10 +50,26 @@ private[feature] trait QuantileDiscretizerBase extends 
Params
   /** @group getParam */
   def getNumBuckets: Int = getOrDefault(numBuckets)
 
+  /**
+   * Array of number of buckets (quantiles, or categories) into which data 
points are grouped.
+   *
+   * See also [[handleInvalid]], which can optionally create an additional 
bucket for NaN values.
+   *
+   * @group param
+   */
+  val numBucketsArray = new IntArrayParam(this, "numBucketsArray", "Array 
of number of buckets " +
+"(quantiles, or categories) into which data points are grouped. This 
is for multiple " +
+"columns input. If numBucketsArray is not set but numBuckets is set, 
it means user wants " +
--- End diff --

"If transforming multiple columns and numBucketsArray is not set, but 
numBuckets is set, then numBuckets will be applied across all columns."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-29 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r153776672
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +152,119 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
+"`QuantileDiscretize` only map one column specified by `inputCol`")
+  false
+} else if (isSet(inputCols)) {
+  true
+} else {
+  false
+}
+  }
+
+  private[feature] def getInOutCols: (Array[String], Array[String]) = {
+if (!isQuantileDiscretizeMultipleColumns) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"inputCols number do not match outputCols")
+  ($(inputCols), $(outputCols))
+}
+  }
+
   @Since("1.6.0")
   override def transformSchema(schema: StructType): StructType = {
-SchemaUtils.checkNumericType(schema, $(inputCol))
-val inputFields = schema.fields
-require(inputFields.forall(_.name != $(outputCol)),
-  s"Output column ${$(outputCol)} already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
+val (inputColNames, outputColNames) = getInOutCols
+val existingFields = schema.fields
+var outputFields = existingFields
+inputColNames.zip(outputColNames).map { case (inputColName, 
outputColName) =>
+  SchemaUtils.checkNumericType(schema, inputColName)
+  require(existingFields.forall(_.name != outputColName),
+s"Output column ${outputColName} already exists.")
+  val attr = NominalAttribute.defaultAttr.withName(outputColName)
+  outputFields :+= attr.toStructField()
+}
 StructType(outputFields)
   }
 
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): Bucketizer = {
 transformSchema(dataset.schema, logging = true)
-val splits = dataset.stat.approxQuantile($(inputCol),
-  (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError))
+val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid))
+if (isQuantileDiscretizeMultipleColumns) {
+  var bucketArray = Array.empty[Int]
--- End diff --

```scala
val bucketSeq = if (isSet(numBucketsArray)) {
  $(numBucketsArray).toSeq
} else {
  Seq($(numBuckets))
}


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-29 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r153786414
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,166 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
--- End diff --

We should add 2 tests:

1. test setting `numBuckets` is the same as setting `numBucketsArray` 
explicitly with identical values
2. test that QD over multiple columns produces the same results as 2x QDs 
over the same columns (as we did for Bucketizer)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-29 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r153774170
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -50,10 +50,26 @@ private[feature] trait QuantileDiscretizerBase extends 
Params
   /** @group getParam */
   def getNumBuckets: Int = getOrDefault(numBuckets)
 
+  /**
+   * Array of number of buckets (quantiles, or categories) into which data 
points are grouped.
+   *
+   * See also [[handleInvalid]], which can optionally create an additional 
bucket for NaN values.
+   *
+   * @group param
+   */
+  val numBucketsArray = new IntArrayParam(this, "numBucketsArray", "Array 
of number of buckets " +
+"(quantiles, or categories) into which data points are grouped. This 
is for multiple " +
+"columns input. If numBucketsArray is not set but numBuckets is set, 
it means user wants " +
+"to use the same numBuckets across all columns.")
--- End diff --

Need a validator function here to ensure all bucket values >= 2


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-29 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r153775090
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -86,6 +104,10 @@ private[feature] trait QuantileDiscretizerBase extends 
Params
  * categorical features. The number of bins can be set using the 
`numBuckets` parameter. It is
  * possible that the number of buckets used will be smaller than this 
value, for example, if there
  * are too few distinct values of the input to create enough distinct 
quantiles.
+ * Since 2.3.0,
--- End diff --

Let's match the Bucketizer comment. So something like:

```
...
Since 2.3.0, `QuantileDiscretizer ` can map multiple columns at once by 
setting the `inputCols` parameter. 
Note that when both the `inputCol` and `inputCols` parameters are set, a 
log warning will be printed and
only `inputCol` will take effect, while `inputCols` will be ignored. To 
specify the number of buckets 
for each column , the `numBucketsArray ` parameter can be set, or if the 
number of buckets should be the
same across columns, `numBuckets` can be set as a convenience.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-29 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r153785692
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +152,119 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
+"`QuantileDiscretize` only map one column specified by `inputCol`")
+  false
+} else if (isSet(inputCols)) {
+  true
+} else {
+  false
+}
+  }
+
+  private[feature] def getInOutCols: (Array[String], Array[String]) = {
+if (!isQuantileDiscretizeMultipleColumns) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"inputCols number do not match outputCols")
+  ($(inputCols), $(outputCols))
+}
+  }
+
   @Since("1.6.0")
   override def transformSchema(schema: StructType): StructType = {
-SchemaUtils.checkNumericType(schema, $(inputCol))
-val inputFields = schema.fields
-require(inputFields.forall(_.name != $(outputCol)),
-  s"Output column ${$(outputCol)} already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
+val (inputColNames, outputColNames) = getInOutCols
+val existingFields = schema.fields
+var outputFields = existingFields
+inputColNames.zip(outputColNames).map { case (inputColName, 
outputColName) =>
+  SchemaUtils.checkNumericType(schema, inputColName)
+  require(existingFields.forall(_.name != outputColName),
+s"Output column ${outputColName} already exists.")
+  val attr = NominalAttribute.defaultAttr.withName(outputColName)
+  outputFields :+= attr.toStructField()
+}
 StructType(outputFields)
   }
 
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): Bucketizer = {
 transformSchema(dataset.schema, logging = true)
-val splits = dataset.stat.approxQuantile($(inputCol),
-  (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError))
+val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid))
+if (isQuantileDiscretizeMultipleColumns) {
--- End diff --

This section overall seems like it can be cleaned up - it should be 
possible to have one code path for a Seq of numBuckets and at the end if 
transforming only one column the splits array should be the first element.

You could check the case of a single `numBuckets` value and `Array.fill` 
that value (if `numBucketsArray` is not set).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-29 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r153772861
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +152,119 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
+"`QuantileDiscretize` only map one column specified by `inputCol`")
--- End diff --

'only map' -> 'will only map'


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-29 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r153775451
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -50,10 +50,26 @@ private[feature] trait QuantileDiscretizerBase extends 
Params
   /** @group getParam */
   def getNumBuckets: Int = getOrDefault(numBuckets)
 
+  /**
+   * Array of number of buckets (quantiles, or categories) into which data 
points are grouped.
--- End diff --

Can add a comment about "each value must be greater than or equal to 2"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-29 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r153772930
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +152,119 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
+"`QuantileDiscretize` only map one column specified by `inputCol`")
--- End diff --

`QuantileDiscretize` -> `QuantileDiscretizer`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-13 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150518031
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val data = (0 until 10).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val data = (0 until data1.length).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+val result = discretizer.fit(df).transform(df)
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets == expectedNumBucket,
+s"Observed number of buckets are not correct." +
+  s" Expected $expectedNumBucket but found ($observedNumBuckets")
+}
+  }
+
+  test("Multiple Columns: Test transform on data with NaN value") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 3
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, 
Double.NaN, Double.NaN)
+val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 2.0, 3.0, 
3.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedKeep1(idx), 
expectedKeep2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("input1", "input2", 
"expected1", "expected2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+
+withClue("QuantileDiscretizer with handleInvalid=error should throw 
exception for NaN values") {
+  intercept[SparkException] {
+discretizer.fit(dataFrame).transform(dataFrame).collect()
+  }
+}
+
+discretizer.setHandleInvalid("keep")
+discretizer.fit(dataFrame).transform(dataFrame).
+  select("result1", "expected1", "result2", "expected2")
+  .collect().foreach {
+  case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+assert(r1 === e1,
+  s"The result value is not correct after bucketing. Expected $e1 
but found $r1")
+assert(r2 === e2,
+  s"The result value is 

[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150451065
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val data = (0 until 10).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val data = (0 until data1.length).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+val result = discretizer.fit(df).transform(df)
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets == expectedNumBucket,
+s"Observed number of buckets are not correct." +
+  s" Expected $expectedNumBucket but found ($observedNumBuckets")
+}
+  }
+
+  test("Multiple Columns: Test transform on data with NaN value") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 3
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, 
Double.NaN, Double.NaN)
+val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 2.0, 3.0, 
3.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedKeep1(idx), 
expectedKeep2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("input1", "input2", 
"expected1", "expected2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+
+withClue("QuantileDiscretizer with handleInvalid=error should throw 
exception for NaN values") {
+  intercept[SparkException] {
+discretizer.fit(dataFrame).transform(dataFrame).collect()
+  }
+}
+
+discretizer.setHandleInvalid("keep")
+discretizer.fit(dataFrame).transform(dataFrame).
+  select("result1", "expected1", "result2", "expected2")
+  .collect().foreach {
+  case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+assert(r1 === e1,
+  s"The result value is not correct after bucketing. Expected $e1 
but found $r1")
+assert(r2 === e2,
+  s"The result value 

[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150450334
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val data = (0 until 10).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val data = (0 until data1.length).map { idx =>
+  (data1(idx), data2(idx))
+}
--- End diff --

Will change to data1.zip(data2). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150450305
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val data = (0 until 10).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
--- End diff --

Will remove DataFrame


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150450319
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val data = (0 until 10).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val data = (0 until data1.length).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
--- End diff --

Will remove DataFrame.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150450280
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val data = (0 until 10).map { idx =>
+  (data1(idx), data2(idx))
+}
--- End diff --

Yes. Will change to data1.zip(data2)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150450222
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +152,95 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
+"`QuantileDiscretize` only map one column specified by `inputCol`")
+  false
+} else if (isSet(inputCols)) {
+  true
+} else {
+  false
+}
+  }
+
+  private[feature] def getInOutCols: (Array[String], Array[String]) = {
+if (!isQuantileDiscretizeMultipleColumns) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"inputCols number do not match outputCols")
+  ($(inputCols), $(outputCols))
+}
+  }
+
   @Since("1.6.0")
   override def transformSchema(schema: StructType): StructType = {
-SchemaUtils.checkNumericType(schema, $(inputCol))
-val inputFields = schema.fields
-require(inputFields.forall(_.name != $(outputCol)),
-  s"Output column ${$(outputCol)} already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
+val (inputColNames, outputColNames) = getInOutCols
+val existingFields = schema.fields
+var outputFields = existingFields
+inputColNames.zip(outputColNames).map { case (inputColName, 
outputColName) =>
+  SchemaUtils.checkNumericType(schema, inputColName)
+  require(existingFields.forall(_.name != outputColName),
+s"Output column ${outputColName} already exists.")
+  val attr = NominalAttribute.defaultAttr.withName(outputColName)
+  outputFields :+= attr.toStructField()
+}
 StructType(outputFields)
   }
 
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): Bucketizer = {
 transformSchema(dataset.schema, logging = true)
-val splits = dataset.stat.approxQuantile($(inputCol),
-  (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError))
+val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid))
+if (isQuantileDiscretizeMultipleColumns) {
+  var bucketArray = Array.empty[Int]
+  if (isSet(numBucketsArray)) {
+bucketArray = $(numBucketsArray)
+  }
+  else {
+bucketArray = Array($(numBuckets))
+  }
+  val probabilityArray = bucketArray.toSeq.flatMap { numOfBucket =>
+(0.0 to 1.0 by 1.0 / numOfBucket)
+  }
+  val splitsArray = dataset.stat.approxQuantile($(inputCols),
+probabilityArray.sorted.toArray.distinct, $(relativeError))
+  val distinctSplitsArray = splitsArray.toSeq.map { splits =>
+getDistinctSplits(splits)
+  }
+  bucketizer.setSplitsArray(distinctSplitsArray.toArray)
+  copyValues(bucketizer.setParent(this))
+}
+else {
--- End diff --

Will fix this. And fix the same problem in another place. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150450151
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -104,7 +126,8 @@ private[feature] trait QuantileDiscretizerBase extends 
Params
  */
 @Since("1.6.0")
 final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override 
val uid: String)
-  extends Estimator[Bucketizer] with QuantileDiscretizerBase with 
DefaultParamsWritable {
+  extends Estimator[Bucketizer] with QuantileDiscretizerBase with 
DefaultParamsWritable
+with HasInputCols with HasOutputCols {
--- End diff --

I guess I will leave this as is even though it's a bit weird. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150401598
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -104,7 +126,8 @@ private[feature] trait QuantileDiscretizerBase extends 
Params
  */
 @Since("1.6.0")
 final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override 
val uid: String)
-  extends Estimator[Bucketizer] with QuantileDiscretizerBase with 
DefaultParamsWritable {
+  extends Estimator[Bucketizer] with QuantileDiscretizerBase with 
DefaultParamsWritable
+with HasInputCols with HasOutputCols {
--- End diff --

It looks a bit weird to have `HasInputCols` and `HasOutputCols` directly in 
`QuantileDiscretizer` and leave other params in `QuantileDiscretizerBase`.

But extending `HasInputCols` and `HasOutputCols` in 
`QuantileDiscretizerBase` causes binary compatibility issue. I think we don't 
want to break the compatibility in the effort of adding multi-col support.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150401672
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +152,95 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
+"`QuantileDiscretize` only map one column specified by `inputCol`")
+  false
+} else if (isSet(inputCols)) {
+  true
+} else {
+  false
+}
+  }
+
+  private[feature] def getInOutCols: (Array[String], Array[String]) = {
+if (!isQuantileDiscretizeMultipleColumns) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"inputCols number do not match outputCols")
+  ($(inputCols), $(outputCols))
+}
+  }
+
   @Since("1.6.0")
   override def transformSchema(schema: StructType): StructType = {
-SchemaUtils.checkNumericType(schema, $(inputCol))
-val inputFields = schema.fields
-require(inputFields.forall(_.name != $(outputCol)),
-  s"Output column ${$(outputCol)} already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
+val (inputColNames, outputColNames) = getInOutCols
+val existingFields = schema.fields
+var outputFields = existingFields
+inputColNames.zip(outputColNames).map { case (inputColName, 
outputColName) =>
+  SchemaUtils.checkNumericType(schema, inputColName)
+  require(existingFields.forall(_.name != outputColName),
+s"Output column ${outputColName} already exists.")
+  val attr = NominalAttribute.defaultAttr.withName(outputColName)
+  outputFields :+= attr.toStructField()
+}
 StructType(outputFields)
   }
 
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): Bucketizer = {
 transformSchema(dataset.schema, logging = true)
-val splits = dataset.stat.approxQuantile($(inputCol),
-  (0.0 to 1.0 by 1.0/$(numBuckets)).toArray, $(relativeError))
+val bucketizer = new Bucketizer(uid).setHandleInvalid($(handleInvalid))
+if (isQuantileDiscretizeMultipleColumns) {
+  var bucketArray = Array.empty[Int]
+  if (isSet(numBucketsArray)) {
+bucketArray = $(numBucketsArray)
+  }
+  else {
+bucketArray = Array($(numBuckets))
+  }
+  val probabilityArray = bucketArray.toSeq.flatMap { numOfBucket =>
+(0.0 to 1.0 by 1.0 / numOfBucket)
+  }
+  val splitsArray = dataset.stat.approxQuantile($(inputCols),
+probabilityArray.sorted.toArray.distinct, $(relativeError))
+  val distinctSplitsArray = splitsArray.toSeq.map { splits =>
+getDistinctSplits(splits)
+  }
+  bucketizer.setSplitsArray(distinctSplitsArray.toArray)
+  copyValues(bucketizer.setParent(this))
+}
+else {
--- End diff --

style issue:

```scala
  ...
} else {
  ...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150401768
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val data = (0 until 10).map { idx =>
+  (data1(idx), data2(idx))
+}
--- End diff --

`val data` seems just as `data1.zip(data2)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150401807
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val data = (0 until 10).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val data = (0 until data1.length).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
--- End diff --

nit: Remove `DataFrame`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150404292
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val data = (0 until 10).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val data = (0 until data1.length).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+val result = discretizer.fit(df).transform(df)
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets == expectedNumBucket,
+s"Observed number of buckets are not correct." +
+  s" Expected $expectedNumBucket but found ($observedNumBuckets")
+}
+  }
+
+  test("Multiple Columns: Test transform on data with NaN value") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 3
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, 
Double.NaN, Double.NaN)
+val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 2.0, 3.0, 
3.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedKeep1(idx), 
expectedKeep2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("input1", "input2", 
"expected1", "expected2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+
+withClue("QuantileDiscretizer with handleInvalid=error should throw 
exception for NaN values") {
+  intercept[SparkException] {
+discretizer.fit(dataFrame).transform(dataFrame).collect()
+  }
+}
+
+discretizer.setHandleInvalid("keep")
+discretizer.fit(dataFrame).transform(dataFrame).
+  select("result1", "expected1", "result2", "expected2")
+  .collect().foreach {
+  case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+assert(r1 === e1,
+  s"The result value is not correct after bucketing. Expected $e1 
but found $r1")
+assert(r2 === e2,
+  s"The result value is 

[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150401744
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val data = (0 until 10).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
--- End diff --

nit: No need for the explicit type `DataFrame`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r150401804
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
 val model = discretizer.fit(df)
 assert(model.hasParent)
   }
+
+  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
+val spark = this.spark
+import spark.implicits._
+
+val datasetSize = 10
+val numBuckets = 5
+val data1 = Array.range(1, 11, 1).map(_.toDouble)
+val data2 = Array.range(1, 20, 2).map(_.toDouble)
+val data = (0 until 10).map { idx =>
+  (data1(idx), data2(idx))
+}
+val df: DataFrame = data.toSeq.toDF("input1", "input2")
+
+val discretizer = new QuantileDiscretizer()
+  .setInputCols(Array("input1", "input2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setNumBuckets(numBuckets)
+assert(discretizer.isQuantileDiscretizeMultipleColumns())
+val result = discretizer.fit(df).transform(df)
+
+val relativeError = discretizer.getRelativeError
+val isGoodBucket = udf {
+  (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
+}
+
+for (i <- 1 to 2) {
+  val observedNumBuckets = result.select("result" + i).distinct.count
+  assert(observedNumBuckets === numBuckets,
+"Observed number of buckets does not equal expected number of 
buckets.")
+
+  val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
+  assert(numGoodBuckets === numBuckets,
+"Bucket sizes are not within expected relative error tolerance.")
+}
+  }
+
+  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
+val spark = this.spark
+import spark.implicits._
+
+val numBuckets = 5
+val expectedNumBucket = 3
+val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
+val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
+val data = (0 until data1.length).map { idx =>
+  (data1(idx), data2(idx))
+}
--- End diff --

Use `data1.zip(data2)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-11-09 Thread huaxingao
GitHub user huaxingao opened a pull request:

https://github.com/apache/spark/pull/19715

[SPARK-22397][ML]add multiple columns support to QuantileDiscretizer

## What changes were proposed in this pull request?

add multi columns support to  QuantileDiscretizer
## How was this patch tested?

add UT in QuantileDiscretizerSuite to test multi columns supports 





You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huaxingao/spark spark_22397

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19715.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19715


commit 07bd868956e8d63294b2acb0b5d01a7ca2b35866
Author: Huaxin Gao 
Date:   2017-11-10T06:57:04Z

[SPARK-22397][ML]add multiple columns support to QuantileDiscretizer




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org