[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239994064
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
+val labelsArray = $(stringOrderType) match {
+  case StringIndexer.frequencyDesc =>
+countByValue(filteredDF, inputCols).map { counts =>
+  counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray
--- End diff --

As one sorts by string and another one sorts by count, can we replace them 
with a compound expression with a single sortBy?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239993480
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
--- End diff --

I'll also add it to ml migration document.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239992942
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -310,11 +439,23 @@ object StringIndexerModel extends 
MLReadable[StringIndexerModel] {
 override def load(path: String): StringIndexerModel = {
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
   val dataPath = new Path(path, "data").toString
-  val data = sparkSession.read.parquet(dataPath)
-.select("labels")
-.head()
-  val labels = data.getAs[Seq[String]](0).toArray
-  val model = new StringIndexerModel(metadata.uid, labels)
+
+  val (majorVersion, minorVersion) = 
majorMinorVersion(metadata.sparkVersion)
+  val labelsArray = if (majorVersion < 2 || (majorVersion == 2 && 
minorVersion <= 3)) {
--- End diff --

But this needs to change for Spark 3.0 now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239992845
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -310,11 +439,23 @@ object StringIndexerModel extends 
MLReadable[StringIndexerModel] {
 override def load(path: String): StringIndexerModel = {
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
   val dataPath = new Path(path, "data").toString
-  val data = sparkSession.read.parquet(dataPath)
-.select("labels")
-.head()
-  val labels = data.getAs[Seq[String]](0).toArray
-  val model = new StringIndexerModel(metadata.uid, labels)
+
+  val (majorVersion, minorVersion) = 
majorMinorVersion(metadata.sparkVersion)
+  val labelsArray = if (majorVersion < 2 || (majorVersion == 2 && 
minorVersion <= 3)) {
--- End diff --

This is for loading old StringIndexerModel saved by previous Spark. 
Previous model has `labels`, but new model has `labelsArray`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239992579
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
+val labelsArray = $(stringOrderType) match {
+  case StringIndexer.frequencyDesc =>
+countByValue(filteredDF, inputCols).map { counts =>
+  counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray
+}
+  case StringIndexer.frequencyAsc =>
+countByValue(filteredDF, inputCols).map { counts =>
+  counts.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray
+}
+  case StringIndexer.alphabetDesc =>
+import dataset.sparkSession.implicits._
+inputCols.map { inputCol =>
+  
filteredDF.select(inputCol).distinct().sort(dataset(s"$inputCol").desc)
--- End diff --

That's good. I miss this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239992360
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -79,26 +81,53 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   @Since("2.3.0")
   def getStringOrderType: String = $(stringOrderType)
 
-  /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType): StructType 
= {
-val inputColName = $(inputCol)
+  /** Returns the input and output column names corresponding in pair. */
+  private[feature] def getInOutCols(): (Array[String], Array[String]) = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
+
+if (isSet(inputCol)) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"The number of input columns does not match output columns")
+  ($(inputCols), $(outputCols))
+}
+  }
+
+  private def validateAndTransformField(
+  schema: StructType,
+  inputColName: String,
+  outputColName: String): StructField = {
 val inputDataType = schema(inputColName).dataType
 require(inputDataType == StringType || 
inputDataType.isInstanceOf[NumericType],
   s"The input column $inputColName must be either string type or 
numeric type, " +
 s"but got $inputDataType.")
-val inputFields = schema.fields
-val outputColName = $(outputCol)
-require(inputFields.forall(_.name != outputColName),
+require(schema.fields.forall(_.name != outputColName),
--- End diff --

Sounds good to me. Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239992378
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
--- End diff --

Moved to `stringOrderType`'s doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239991238
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
--- End diff --

Yes. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239889212
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -177,32 +245,47 @@ object StringIndexer extends 
DefaultParamsReadable[StringIndexer] {
 /**
  * Model fitted by [[StringIndexer]].
  *
- * @param labels  Ordered list of labels, corresponding to indices to be 
assigned.
+ * @param labelsArray Array of ordered list of labels, corresponding to 
indices to be assigned
+ *for each input column.
  *
- * @note During transformation, if the input column does not exist,
- * `StringIndexerModel.transform` would return the input dataset 
unmodified.
+ * @note During transformation, if any input column does not exist,
+ * `StringIndexerModel.transform` would skip the input column.
+ * If all input columns do not exist, it returns the input dataset 
unmodified.
  * This is a temporary fix for the case when target labels do not exist 
during prediction.
  */
 @Since("1.4.0")
 class StringIndexerModel (
 @Since("1.4.0") override val uid: String,
-@Since("1.5.0") val labels: Array[String])
+@Since("2.4.0") val labelsArray: Array[Array[String]])
   extends Model[StringIndexerModel] with StringIndexerBase with MLWritable 
{
 
   import StringIndexerModel._
 
   @Since("1.5.0")
-  def this(labels: Array[String]) = this(Identifiable.randomUID("strIdx"), 
labels)
-
-  private val labelToIndex: OpenHashMap[String, Double] = {
-val n = labels.length
-val map = new OpenHashMap[String, Double](n)
-var i = 0
-while (i < n) {
-  map.update(labels(i), i)
-  i += 1
+  def this(labels: Array[String]) = this(Identifiable.randomUID("strIdx"), 
Array(labels))
+
+  @Since("2.4.0")
+  def this(labelsArray: Array[Array[String]]) = 
this(Identifiable.randomUID("strIdx"), labelsArray)
+
+  @Since("1.5.0")
+  def labels: Array[String] = {
+require(labelsArray.length == 1, "This StringIndexerModel is fitted by 
multi-columns, " +
+  "call for `labelsArray` instead.")
+labelsArray(0)
+  }
+
+  // Prepares the maps for string values to corresponding index values.
+  private val labelsToIndexArray: Array[OpenHashMap[String, Double]] = {
+for (labels <- labelsArray) yield {
+  val n = labels.length
+  val map = new OpenHashMap[String, Double](n)
+  var i = 0
--- End diff --

We could use a `foreach` here or similar to avoid the var + loop? (And or 
put from seq logic in the openhashmap).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239887472
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
--- End diff --

Is this a change of behaviour? If so we should document it in a visible 
place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239889869
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -310,11 +439,23 @@ object StringIndexerModel extends 
MLReadable[StringIndexerModel] {
 override def load(path: String): StringIndexerModel = {
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
   val dataPath = new Path(path, "data").toString
-  val data = sparkSession.read.parquet(dataPath)
-.select("labels")
-.head()
-  val labels = data.getAs[Seq[String]](0).toArray
-  val model = new StringIndexerModel(metadata.uid, labels)
+
+  val (majorVersion, minorVersion) = 
majorMinorVersion(metadata.sparkVersion)
+  val labelsArray = if (majorVersion < 2 || (majorVersion == 2 && 
minorVersion <= 3)) {
--- End diff --

I'm confused by this logic -- are we expecting people to use the MLlib code 
with different version of Spark here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239885097
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -79,26 +81,53 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   @Since("2.3.0")
   def getStringOrderType: String = $(stringOrderType)
 
-  /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType): StructType 
= {
-val inputColName = $(inputCol)
+  /** Returns the input and output column names corresponding in pair. */
+  private[feature] def getInOutCols(): (Array[String], Array[String]) = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
+
+if (isSet(inputCol)) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"The number of input columns does not match output columns")
+  ($(inputCols), $(outputCols))
+}
+  }
+
+  private def validateAndTransformField(
+  schema: StructType,
+  inputColName: String,
+  outputColName: String): StructField = {
 val inputDataType = schema(inputColName).dataType
 require(inputDataType == StringType || 
inputDataType.isInstanceOf[NumericType],
   s"The input column $inputColName must be either string type or 
numeric type, " +
 s"but got $inputDataType.")
-val inputFields = schema.fields
-val outputColName = $(outputCol)
-require(inputFields.forall(_.name != outputColName),
+require(schema.fields.forall(_.name != outputColName),
--- End diff --

minor: So this check seems good, should we add a check that there are not 
any duplicate output fields?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239888373
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
+val labelsArray = $(stringOrderType) match {
+  case StringIndexer.frequencyDesc =>
+countByValue(filteredDF, inputCols).map { counts =>
+  counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray
+}
+  case StringIndexer.frequencyAsc =>
+countByValue(filteredDF, inputCols).map { counts =>
+  counts.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray
+}
+  case StringIndexer.alphabetDesc =>
+import dataset.sparkSession.implicits._
+inputCols.map { inputCol =>
+  
filteredDF.select(inputCol).distinct().sort(dataset(s"$inputCol").desc)
--- End diff --

This could trigger many actions and if the input is not cached that's not 
great. We could cache the filteredDF (and explicitily uncache later) as one 
solution, what do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239887259
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
+val labelsArray = $(stringOrderType) match {
+  case StringIndexer.frequencyDesc =>
+countByValue(filteredDF, inputCols).map { counts =>
+  counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray
--- End diff --

So would it make sense to do a single sortBy with a compound expression 
instead of 2 sorts (and same bellow)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239888591
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
+val labelsArray = $(stringOrderType) match {
+  case StringIndexer.frequencyDesc =>
+countByValue(filteredDF, inputCols).map { counts =>
+  counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray
+}
+  case StringIndexer.frequencyAsc =>
+countByValue(filteredDF, inputCols).map { counts =>
+  counts.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray
+}
+  case StringIndexer.alphabetDesc =>
+import dataset.sparkSession.implicits._
+inputCols.map { inputCol =>
+  
filteredDF.select(inputCol).distinct().sort(dataset(s"$inputCol").desc)
+.as[String].collect()
+}
+  case StringIndexer.alphabetAsc =>
+import dataset.sparkSession.implicits._
+inputCols.map { inputCol =>
+  
filteredDF.select(inputCol).distinct().sort(dataset(s"$inputCol").asc)
--- End diff --

Same comment as the Desc case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239885459
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
--- End diff --

Going to want to update these to 3.0


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-07-19 Thread viirya
GitHub user viirya reopened a pull request:

https://github.com/apache/spark/pull/20146

[SPARK-11215][ML] Add multiple columns support to StringIndexer

## What changes were proposed in this pull request?

This takes over #19621 to add multi-column support to StringIndexer.

## How was this patch tested?

Added tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-11215

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20146.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20146


commit bb990f1a6511d8ce20f4fff254dfe0ff43262a10
Author: Liang-Chi Hsieh 
Date:   2018-01-03T03:51:59Z

Add multi-column support to StringIndexer.

commit 26cc94bb335cf0ba3bcdbc2b78effd447026792c
Author: Liang-Chi Hsieh 
Date:   2018-01-07T01:42:28Z

Fix glm test.

commit 540c364d2a70ecd6ee5b92fadedc5e9b85026d2c
Author: Liang-Chi Hsieh 
Date:   2018-01-16T08:20:19Z

Merge remote-tracking branch 'upstream/master' into SPARK-11215

commit 18acbbf7b70b87c75ba62be863580fe9accc23b4
Author: Liang-Chi Hsieh 
Date:   2018-01-24T12:03:26Z

Improve test cases.

commit b884fb5c0ce1e627390d08d8425721ea8e4d
Author: Liang-Chi Hsieh 
Date:   2018-01-27T00:58:06Z

Merge remote-tracking branch 'upstream/master' into SPARK-11215

commit 76ff7bf6054a687abd9fc16c8044020a5454d95f
Author: Liang-Chi Hsieh 
Date:   2018-04-19T11:27:21Z

Merge remote-tracking branch 'upstream/master' into SPARK-11215

commit 50af02eaccce7cecb7c3093d5bc14675ca860c22
Author: Liang-Chi Hsieh 
Date:   2018-04-19T11:30:46Z

Change from 2.3 to 2.4.

commit c1be2c7e28ebdfed580577a108d2f254834caed7
Author: Liang-Chi Hsieh 
Date:   2018-04-23T10:15:49Z

Address comments.

commit ed35d875414ba3cf8751a77463f61665e9c373b0
Author: Liang-Chi Hsieh 
Date:   2018-04-23T14:00:16Z

Address comment.

commit a1dcfda85243a1e2210177f2acfb78821c539b17
Author: Liang-Chi Hsieh 
Date:   2018-04-24T06:41:07Z

Use SQL Aggregator for counting string labels.

commit c1685228f7ec7f4904bca67efaee70498b9894c8
Author: Liang-Chi Hsieh 
Date:   2018-04-25T13:28:24Z

Merge remote-tracking branch 'upstream/master' into SPARK-11215

commit a6551b02a10428d66e0dadcfcb5a8da3798ec814
Author: Liang-Chi Hsieh 
Date:   2018-04-26T04:13:09Z

Drop NA values for both frequency and alphabet order types.

commit c003bd3d6c58cf19249ff0ba9dd10140971d655c
Author: Liang-Chi Hsieh 
Date:   2018-07-18T18:58:21Z

Merge remote-tracking branch 'upstream/master' into SPARK-11215




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-07-19 Thread viirya
Github user viirya closed the pull request at:

https://github.com/apache/spark/pull/20146


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183619017
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,57 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
--- End diff --

Use SQL `Aggregator` now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183408802
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, 
Long]())
+
+// Counts by the string values in the dataset.
+val countByValueArray = dataset.na.drop(inputCols)
+  .select(inputCols.map(col(_).cast(StringType)): _*)
+  .rdd.treeAggregate(zeroState)(
--- End diff --

Do you think we should replace `treeAggregate` with `Aggregator` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183404967
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, 
Long]())
+
+// Counts by the string values in the dataset.
+val countByValueArray = dataset.na.drop(inputCols)
+  .select(inputCols.map(col(_).cast(StringType)): _*)
+  .rdd.treeAggregate(zeroState)(
+(state: Array[OpenHashMap[String, Long]], row: Row) => {
+  for (i <- 0 until inputCols.length) {
+state(i).changeValue(row.getString(i), 1L, _ + 1)
+  }
+  state
+},
+(state1: Array[OpenHashMap[String, Long]], state2: 
Array[OpenHashMap[String, Long]]) => {
+  for (i <- 0 until inputCols.length) {
+state2(i).foreach { case (key: String, count: Long) =>
+  state1(i).changeValue(key, count, _ + count)
+}
+  }
+  state1
+}
+  )
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
+val labelsArray = countByValueArray.map { countByValue =>
+  $(stringOrderType) match {
+case StringIndexer.frequencyDesc =>
+  countByValue.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray
+case StringIndexer.frequencyAsc =>
+  countByValue.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray
+case StringIndexer.alphabetDesc => 
countByValue.toSeq.map(_._1).sortWith(_ > _).toArray
--- End diff --

Good catch! We don't need to count the labels under 
`StringIndexer.alphabetDesc` and `StringIndexer.alphabetAsc`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183393215
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, 
Long]())
+
+// Counts by the string values in the dataset.
+val countByValueArray = dataset.na.drop(inputCols)
+  .select(inputCols.map(col(_).cast(StringType)): _*)
+  .rdd.treeAggregate(zeroState)(
--- End diff --

I think it should be doable with SQL `Aggregator`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183344264
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
+for (i <- 0 until inputColNames.length) {
+  val inputColName = inputColNames(i)
+  val labelToIndex = labelsToIndexArray(i)
+  val filterer = udf { label: String =>
+labelToIndex.contains(label)
+  }
+  filteredDataset = 
filteredDataset.where(filterer(dataset(inputColName)))
 }
+filteredDataset
+  }
 
-val metadata = NominalAttribute.defaultAttr
-  .withName($(outputCol)).withValues(filteredLabels).toMetadata()
-// If we are skipping invalid records, filter them out.
-val (filteredDataset, keepInvalid) = $(handleInvalid) match {
-  case StringIndexer.SKIP_INVALID =>
-val filterer = udf { label: String =>
-  labelToIndex.contains(label)
-}
-
(dataset.na.drop(Array($(inputCol))).where(filterer(dataset($(inputCol, 
false)
-  case _ => (dataset, getHandleInvalid == StringIndexer.KEEP_INVALID)
-}
+  private def getIndexer(labels: Seq[String], labelToIndex: 
OpenHashMap[String, Double]) = {
+val keepInvalid = (getHandleInvalid == StringIndexer.KEEP_INVALID)
 
-val indexer = udf { label: String =>
+udf { label: String =>
--- End diff --

> This requires calling many udf for different input columns. Should we 
combine then in one udf? 

Then we must define the single UDF with different parameter number (looks 
like the big pattern matching in `ScalaUDF`). We also don't support UDFs with 
more than 22 parameters.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183341186
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
+for (i <- 0 until inputColNames.length) {
+  val inputColName = inputColNames(i)
+  val labelToIndex = labelsToIndexArray(i)
+  val filterer = udf { label: String =>
+labelToIndex.contains(label)
+  }
+  filteredDataset = 
filteredDataset.where(filterer(dataset(inputColName)))
 }
+filteredDataset
+  }
 
-val metadata = NominalAttribute.defaultAttr
-  .withName($(outputCol)).withValues(filteredLabels).toMetadata()
-// If we are skipping invalid records, filter them out.
-val (filteredDataset, keepInvalid) = $(handleInvalid) match {
-  case StringIndexer.SKIP_INVALID =>
-val filterer = udf { label: String =>
-  labelToIndex.contains(label)
-}
-
(dataset.na.drop(Array($(inputCol))).where(filterer(dataset($(inputCol, 
false)
-  case _ => (dataset, getHandleInvalid == StringIndexer.KEEP_INVALID)
-}
+  private def getIndexer(labels: Seq[String], labelToIndex: 
OpenHashMap[String, Double]) = {
+val keepInvalid = (getHandleInvalid == StringIndexer.KEEP_INVALID)
 
-val indexer = udf { label: String =>
+udf { label: String =>
--- End diff --

`filteredDataset` may not be able to put with indexer together. Indexer 
UDFs (returning double) do transformation to the data, but `filteredDataset` 
logic uses the UDFs (returning boolean) to filter invalid rows.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183339386
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
+for (i <- 0 until inputColNames.length) {
+  val inputColName = inputColNames(i)
+  val labelToIndex = labelsToIndexArray(i)
+  val filterer = udf { label: String =>
+labelToIndex.contains(label)
+  }
+  filteredDataset = 
filteredDataset.where(filterer(dataset(inputColName)))
--- End diff --

Ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183336177
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
+for (i <- 0 until inputColNames.length) {
+  val inputColName = inputColNames(i)
+  val labelToIndex = labelsToIndexArray(i)
+  val filterer = udf { label: String =>
+labelToIndex.contains(label)
--- End diff --

Lookup for a key in a map is not efficient in SQL now.  It is basically a 
linear search in a key array if I don't miss anything.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183334835
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   @Since("2.3.0")
   def getStringOrderType: String = $(stringOrderType)
 
-  /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType): StructType 
= {
-val inputColName = $(inputCol)
+  /** Returns the input and output column names corresponding in pair. */
+  private[feature] def getInOutCols(): (Array[String], Array[String]) = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
+
+if (isSet(inputCol)) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"The number of input columns does not match output columns")
+  ($(inputCols), $(outputCols))
+}
+  }
+
+  private def validateAndTransformField(
+  schema: StructType,
+  inputColName: String,
+  outputColName: String): StructField = {
 val inputDataType = schema(inputColName).dataType
 require(inputDataType == StringType || 
inputDataType.isInstanceOf[NumericType],
   s"The input column $inputColName must be either string type or 
numeric type, " +
 s"but got $inputDataType.")
-val inputFields = schema.fields
-val outputColName = $(outputCol)
-require(inputFields.forall(_.name != outputColName),
+require(schema.fields.forall(_.name != outputColName),
   s"Output column $outputColName already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
-StructType(outputFields)
+NominalAttribute.defaultAttr.withName($(outputCol)).toStructField()
+  }
+
+  /** Validates and transforms the input schema. */
+  protected def validateAndTransformSchema(
+  schema: StructType,
+  skipNonExistsCol: Boolean = false): StructType = {
+val (inputColNames, outputColNames) = getInOutCols()
+
+val outputFields = for (i <- 0 until inputColNames.length) yield {
--- End diff --

Looks good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r18752
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   @Since("2.3.0")
   def getStringOrderType: String = $(stringOrderType)
 
-  /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType): StructType 
= {
-val inputColName = $(inputCol)
+  /** Returns the input and output column names corresponding in pair. */
+  private[feature] def getInOutCols(): (Array[String], Array[String]) = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
+
+if (isSet(inputCol)) {
--- End diff --

`ParamValidators.checkSingleVsMultiColumnParams` checks the param validity 
under multiple columns support.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183257701
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
--- End diff --

add one empty line for readbility


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183255152
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, 
Long]())
+
+// Counts by the string values in the dataset.
+val countByValueArray = dataset.na.drop(inputCols)
+  .select(inputCols.map(col(_).cast(StringType)): _*)
+  .rdd.treeAggregate(zeroState)(
+(state: Array[OpenHashMap[String, Long]], row: Row) => {
+  for (i <- 0 until inputCols.length) {
+state(i).changeValue(row.getString(i), 1L, _ + 1)
+  }
+  state
+},
+(state1: Array[OpenHashMap[String, Long]], state2: 
Array[OpenHashMap[String, Long]]) => {
+  for (i <- 0 until inputCols.length) {
+state2(i).foreach { case (key: String, count: Long) =>
+  state1(i).changeValue(key, count, _ + count)
+}
+  }
+  state1
+}
+  )
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
+val labelsArray = countByValueArray.map { countByValue =>
+  $(stringOrderType) match {
+case StringIndexer.frequencyDesc =>
+  countByValue.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray
+case StringIndexer.frequencyAsc =>
+  countByValue.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray
+case StringIndexer.alphabetDesc => 
countByValue.toSeq.map(_._1).sortWith(_ > _).toArray
--- End diff --

I think we can break the code into two paths. One is sorting by frequency 
which requires to compute the counts, and the other is sorting by alphabet 
which only requires distinct. We could move the `countByValueArray` code into 
labelsArray.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183253488
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   @Since("2.3.0")
   def getStringOrderType: String = $(stringOrderType)
 
-  /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType): StructType 
= {
-val inputColName = $(inputCol)
+  /** Returns the input and output column names corresponding in pair. */
+  private[feature] def getInOutCols(): (Array[String], Array[String]) = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
+
+if (isSet(inputCol)) {
--- End diff --

If both `inputCol` and `inputCols` are  set, throw an exception. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r18325
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
+for (i <- 0 until inputColNames.length) {
+  val inputColName = inputColNames(i)
+  val labelToIndex = labelsToIndexArray(i)
+  val filterer = udf { label: String =>
+labelToIndex.contains(label)
--- End diff --

Isn't this very expansive? We basically look up `labelToIndex` twice.

Will be cool that we support `lit(Map())` so we can do those lookup 
natively in SQL, and also `na.drop` together in wholestage codegen. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183253932
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   @Since("2.3.0")
   def getStringOrderType: String = $(stringOrderType)
 
-  /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType): StructType 
= {
-val inputColName = $(inputCol)
+  /** Returns the input and output column names corresponding in pair. */
+  private[feature] def getInOutCols(): (Array[String], Array[String]) = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
+
+if (isSet(inputCol)) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"The number of input columns does not match output columns")
+  ($(inputCols), $(outputCols))
+}
+  }
+
+  private def validateAndTransformField(
+  schema: StructType,
+  inputColName: String,
+  outputColName: String): StructField = {
 val inputDataType = schema(inputColName).dataType
 require(inputDataType == StringType || 
inputDataType.isInstanceOf[NumericType],
   s"The input column $inputColName must be either string type or 
numeric type, " +
 s"but got $inputDataType.")
-val inputFields = schema.fields
-val outputColName = $(outputCol)
-require(inputFields.forall(_.name != outputColName),
+require(schema.fields.forall(_.name != outputColName),
   s"Output column $outputColName already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
-StructType(outputFields)
+NominalAttribute.defaultAttr.withName($(outputCol)).toStructField()
+  }
+
+  /** Validates and transforms the input schema. */
+  protected def validateAndTransformSchema(
+  schema: StructType,
+  skipNonExistsCol: Boolean = false): StructType = {
+val (inputColNames, outputColNames) = getInOutCols()
+
+val outputFields = for (i <- 0 until inputColNames.length) yield {
+  if (schema.fieldNames.contains(inputColNames(i))) {
+validateAndTransformField(schema, inputColNames(i), 
outputColNames(i))
+  } else {
+if (skipNonExistsCol) {
+  null
+} else {
+  throw new SparkException(s"Input column ${inputColNames(i)} does 
not exist.")
+}
+  }
+}
+StructType(schema.fields ++ outputFields.filter(_ != null))
--- End diff --

Then you don't need to filter with the above code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183257676
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
--- End diff --

Please add some comment about what is the invalid data.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183254078
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, 
Long]())
+
+// Counts by the string values in the dataset.
+val countByValueArray = dataset.na.drop(inputCols)
+  .select(inputCols.map(col(_).cast(StringType)): _*)
+  .rdd.treeAggregate(zeroState)(
--- End diff --

Possible to aggregate natively with SQL? I don't think we will compromise 
the performance with SQL aggregation like `groupBy` and `agg` and 
`countDistinct` without using tree aggregation since the states will be very 
small in this use-case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183258353
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
+for (i <- 0 until inputColNames.length) {
+  val inputColName = inputColNames(i)
+  val labelToIndex = labelsToIndexArray(i)
+  val filterer = udf { label: String =>
+labelToIndex.contains(label)
+  }
+  filteredDataset = 
filteredDataset.where(filterer(dataset(inputColName)))
 }
+filteredDataset
+  }
 
-val metadata = NominalAttribute.defaultAttr
-  .withName($(outputCol)).withValues(filteredLabels).toMetadata()
-// If we are skipping invalid records, filter them out.
-val (filteredDataset, keepInvalid) = $(handleInvalid) match {
-  case StringIndexer.SKIP_INVALID =>
-val filterer = udf { label: String =>
-  labelToIndex.contains(label)
-}
-
(dataset.na.drop(Array($(inputCol))).where(filterer(dataset($(inputCol, 
false)
-  case _ => (dataset, getHandleInvalid == StringIndexer.KEEP_INVALID)
-}
+  private def getIndexer(labels: Seq[String], labelToIndex: 
OpenHashMap[String, Double]) = {
+val keepInvalid = (getHandleInvalid == StringIndexer.KEEP_INVALID)
 
-val indexer = udf { label: String =>
+udf { label: String =>
--- End diff --

This requires calling many udf for different input columns. Should we 
combine then in one udf? The `filteredDataset` logic can be in as well to avoid 
multiple lookups. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183257799
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
+for (i <- 0 until inputColNames.length) {
+  val inputColName = inputColNames(i)
+  val labelToIndex = labelsToIndexArray(i)
+  val filterer = udf { label: String =>
+labelToIndex.contains(label)
+  }
+  filteredDataset = 
filteredDataset.where(filterer(dataset(inputColName)))
--- End diff --

is it possible to not use `var filteredDataset`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183253904
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   @Since("2.3.0")
   def getStringOrderType: String = $(stringOrderType)
 
-  /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType): StructType 
= {
-val inputColName = $(inputCol)
+  /** Returns the input and output column names corresponding in pair. */
+  private[feature] def getInOutCols(): (Array[String], Array[String]) = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
+
+if (isSet(inputCol)) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"The number of input columns does not match output columns")
+  ($(inputCols), $(outputCols))
+}
+  }
+
+  private def validateAndTransformField(
+  schema: StructType,
+  inputColName: String,
+  outputColName: String): StructField = {
 val inputDataType = schema(inputColName).dataType
 require(inputDataType == StringType || 
inputDataType.isInstanceOf[NumericType],
   s"The input column $inputColName must be either string type or 
numeric type, " +
 s"but got $inputDataType.")
-val inputFields = schema.fields
-val outputColName = $(outputCol)
-require(inputFields.forall(_.name != outputColName),
+require(schema.fields.forall(_.name != outputColName),
   s"Output column $outputColName already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
-StructType(outputFields)
+NominalAttribute.defaultAttr.withName($(outputCol)).toStructField()
+  }
+
+  /** Validates and transforms the input schema. */
+  protected def validateAndTransformSchema(
+  schema: StructType,
+  skipNonExistsCol: Boolean = false): StructType = {
+val (inputColNames, outputColNames) = getInOutCols()
+
+val outputFields = for (i <- 0 until inputColNames.length) yield {
--- End diff --

Nit, why not the following for readability? 

```scala
val outputFields = inputColNames.zip(outputColNames).flatMap { case 
(inputColName, outputColName) =>
  schema.fieldNames.contains(inputColName) match {
case true => validateAndTransformField(schema, inputColName, 
outputColName)
case false if skipNonExistsCol => None
case throw new SparkException(s"Input column $inputColName does not 
exist."
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r163528221
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -331,4 +357,51 @@ class StringIndexerSuite
 val dfWithIndex = model.transform(dfNoBristol)
 assert(dfWithIndex.filter($"CITYIndexed" === 1.0).count == 1)
   }
+
+  test("StringIndexer multiple input columns") {
--- End diff --

Good point. I added one test for this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r163528239
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -33,12 +33,38 @@ class StringIndexerSuite
 
   test("params") {
 ParamsSuite.checkParams(new StringIndexer)
-val model = new StringIndexerModel("indexer", Array("a", "b"))
+val model = new StringIndexerModel("indexer", Array(Array("a", "b")))
 val modelWithoutUid = new StringIndexerModel(Array("a", "b"))
 ParamsSuite.checkParams(model)
 ParamsSuite.checkParams(modelWithoutUid)
   }
 
+  test("params: input/output columns") {
+val stringIndexerSingleCol = new StringIndexer()
+  .setInputCol("in").setOutputCol("out")
+val inOutCols1 = stringIndexerSingleCol.getInOutCols()
+assert(inOutCols1._1 === Array("in"))
+assert(inOutCols1._2 === Array("out"))
+
+val stringIndexerMultiCol = new StringIndexer()
+  .setInputCols(Array("in1", "in2")).setOutputCols(Array("out1", 
"out2"))
+val inOutCols2 = stringIndexerMultiCol.getInOutCols()
+assert(inOutCols2._1 === Array("in1", "in2"))
+assert(inOutCols2._2 === Array("out1", "out2"))
+
+intercept[IllegalArgumentException] {
+  new StringIndexer().setInputCol("in").setOutputCols(Array("out1", 
"out2")).getInOutCols()
--- End diff --

Ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r162777500
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala ---
@@ -249,6 +249,16 @@ object ParamValidators {
   def arrayLengthGt[T](lowerBound: Double): Array[T] => Boolean = { 
(value: Array[T]) =>
 value.length > lowerBound
   }
+
+  /** Check if more than one param in a set of exclusive params are set. */
+  def checkExclusiveParams(model: Params, params: String*): Unit = {
+if (params.filter(paramName => model.hasParam(paramName) &&
--- End diff --

The purpose of this method is to check if more than one Params are set 
among some exclusive Params within a Model. Is it useful to put an irrelevant 
Param into the exclusive Params to check? As we already know what Params the 
model has, it sounds like we want to check an irrelevant Param that we already 
know non-existing?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r162715788
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala ---
@@ -249,6 +249,16 @@ object ParamValidators {
   def arrayLengthGt[T](lowerBound: Double): Array[T] => Boolean = { 
(value: Array[T]) =>
 value.length > lowerBound
   }
+
+  /** Check if more than one param in a set of exclusive params are set. */
+  def checkExclusiveParams(model: Params, params: String*): Unit = {
+if (params.filter(paramName => model.hasParam(paramName) &&
--- End diff --

Why is this checking to see if the Param belongs to the Model?  If this 
method is called with irrelevant Params, shouldn't it throw an error?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-11 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r161039325
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -33,12 +33,38 @@ class StringIndexerSuite
 
   test("params") {
 ParamsSuite.checkParams(new StringIndexer)
-val model = new StringIndexerModel("indexer", Array("a", "b"))
+val model = new StringIndexerModel("indexer", Array(Array("a", "b")))
 val modelWithoutUid = new StringIndexerModel(Array("a", "b"))
 ParamsSuite.checkParams(model)
 ParamsSuite.checkParams(modelWithoutUid)
   }
 
+  test("params: input/output columns") {
+val stringIndexerSingleCol = new StringIndexer()
+  .setInputCol("in").setOutputCol("out")
+val inOutCols1 = stringIndexerSingleCol.getInOutCols()
+assert(inOutCols1._1 === Array("in"))
+assert(inOutCols1._2 === Array("out"))
+
+val stringIndexerMultiCol = new StringIndexer()
+  .setInputCols(Array("in1", "in2")).setOutputCols(Array("out1", 
"out2"))
+val inOutCols2 = stringIndexerMultiCol.getInOutCols()
+assert(inOutCols2._1 === Array("in1", "in2"))
+assert(inOutCols2._2 === Array("out1", "out2"))
+
+intercept[IllegalArgumentException] {
+  new StringIndexer().setInputCol("in").setOutputCols(Array("out1", 
"out2")).getInOutCols()
--- End diff --

It seems better that, use the way calling `stringIndexer.fit` and check 
exception thrown.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-11 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r161040131
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala ---
@@ -249,6 +249,16 @@ object ParamValidators {
   def arrayLengthGt[T](lowerBound: Double): Array[T] => Boolean = { 
(value: Array[T]) =>
 value.length > lowerBound
   }
+
+  /** Check if more than one param in a set of exclusive params are set. */
+  def checkExclusiveParams(model: Params, params: String*): Unit = {
--- End diff --

@hhbyyh Is this the common helper methods you want for checking in/out 
single/multiple columns ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-11 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r161040537
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -331,4 +357,51 @@ class StringIndexerSuite
 val dfWithIndex = model.transform(dfNoBristol)
 assert(dfWithIndex.filter($"CITYIndexed" === 1.0).count == 1)
   }
+
+  test("StringIndexer multiple input columns") {
--- End diff --

Is there a test case, for testing `frequencyAsc/Desc`, when frequency 
equal, it will secondary sort by alphabets ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r160066219
  
--- Diff: R/pkg/tests/fulltests/test_mllib_regression.R ---
@@ -126,15 +134,15 @@ test_that("spark.glm summary", {
 
   out <- capture.output(print(stats))
   expect_match(out[2], "Deviance Residuals:")
-  expect_true(any(grepl("AIC: 59.22", out)))
+  expect_true(any(grepl("AIC: 35.84", out)))
--- End diff --

R glm's AIC: 35.839:
```R
> out <- capture.output(print(rStats))
> out
 [1] "" 
   
 [2] "Call:"
   
 [3] "glm(formula = Sepal.Width ~ Sepal.Length + Species, data = dataset)"  
   
 [4] "" 
   
 [5] "Deviance Residuals: " 
   
 [6] "  1234567
8  "
 [7] " 0.  -1.4932   1.5491   0.5411  -0.8581  -1.2228  -0.5969   
2.0809  "
 [8] "" 
   
 [9] "Coefficients:"
   
[10] "  Estimate Std. Error t value Pr(>|t|)"   
   
[11] "(Intercept) 1.7150 2.0492   0.8370.450"   
   
[12] "Sepal.Length0.1925 0.5566   0.3460.747"   
   
[13] "Speciesversicolor   1.7894 1.9240   0.9300.405"   
   
[14] "Speciesvirginica1.2613 2.0735   0.6080.576"   
   
[15] "" 
   
[16] "(Dispersion parameter for gaussian family taken to be 2.960032)"  
   
[17] "" 
   
[18] "Null deviance: 14.719  on 7  degrees of freedom"  
   
[19] "Residual deviance: 11.840  on 4  degrees of freedom"  
   
[20] "AIC: 35.839"  
   
[21] "" 
   
[22] "Number of Fisher Scoring iterations: 2"   
   
[23] "" 
   
```

  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r160066433
  
--- Diff: R/pkg/tests/fulltests/test_mllib_regression.R ---
@@ -174,17 +182,17 @@ test_that("spark.glm summary", {
   expect_equal(stats$aic, rStats$aic)
 
   # Test spark.glm works with offset
-  training <- suppressWarnings(createDataFrame(iris))
+  training <- suppressWarnings(createDataFrame(dataset))
   stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + 
Species,
  family = poisson(), offsetCol = 
"Petal_Length"))
   rStats <- suppressWarnings(summary(glm(Sepal.Width ~ Sepal.Length + 
Species,
-data = iris, family = poisson(), offset = 
iris$Petal.Length)))
+data = dataset, family = poisson(), offset = 
dataset$Petal.Length)))
   expect_true(all(abs(rStats$coefficients - stats$coefficients) < 1e-3))
 
   # Test summary works on base GLM models
-  baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = 
iris)
+  baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = 
dataset)
   baseSummary <- summary(baseModel)
-  expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
+  expect_true(abs(baseSummary$deviance - 11.84013) < 1e-4)
--- End diff --

R glm:
```R
> baseSummary <- summary(stats::glm(Sepal.Width ~ Sepal.Length + Species, 
data = dataset))
> baseSummary$deviance
[1] 11.84013
```

Spark glm:
```R
> baseSummary <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + 
Species))
> baseSummary$deviance
[1] 11.84013
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r160049837
  
--- Diff: R/pkg/tests/fulltests/test_mllib_classification.R ---
@@ -348,12 +348,12 @@ test_that("spark.mlp", {
 
   # Test random seed
   # default seed
-  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 10)
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 100)
--- End diff --

Ran it again with the config:

```R
> sparkR.conf("spark.sparkr.use.daemon")
 
$spark.sparkr.use.daemon
[1] "false"
> start.time <- Sys.time()
> model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 10)
> end.time <- Sys.time()
> time.taken <- end.time - start.time
> time.taken
Time difference of 1.704288 secs
```

```R
> start.time <- Sys.time()
> model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 100)
> end.time <- Sys.time()
> time.taken <- end.time - start.time
> time.taken
Time difference of 5.135418 secs
```




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r160048655
  
--- Diff: R/pkg/tests/fulltests/test_mllib_classification.R ---
@@ -348,12 +348,12 @@ test_that("spark.mlp", {
 
   # Test random seed
   # default seed
-  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 10)
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 100)
--- End diff --

Ahh, @viirya, would you mind if I ask to check it after setting 
`spark.sparkr.use.daemon` to `false` too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r160039165
  
--- Diff: R/pkg/tests/fulltests/test_mllib_classification.R ---
@@ -348,12 +348,12 @@ test_that("spark.mlp", {
 
   # Test random seed
   # default seed
-  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 10)
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 100)
--- End diff --

```R
> start.time <- Sys.time()
> model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 10)
> end.time <- Sys.time()
> time.taken <- end.time - start.time
> time.taken
Time difference of 1.780564 secs
```

```R
> start.time <- Sys.time()
> model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 100)
> end.time <- Sys.time()
> time.taken <- end.time - start.time
> time.taken
Time difference of 5.728089 secs
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-03 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r159584417
  
--- Diff: R/pkg/tests/fulltests/test_mllib_classification.R ---
@@ -348,12 +348,12 @@ test_that("spark.mlp", {
 
   # Test random seed
   # default seed
-  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 10)
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 100)
--- End diff --

can you check if the run time increases significantly? this is an issue 
before - see SPARK-21693


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r159580805
  
--- Diff: R/pkg/tests/fulltests/test_mllib_classification.R ---
@@ -313,7 +313,7 @@ test_that("spark.mlp", {
   # Test predict method
--- End diff --

Actually I think we may remove this test `Test predict method`. Seems to 
me, with the `tol = 0.5`, the prediction may not be very meaningful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r159579587
  
--- Diff: R/pkg/tests/fulltests/test_mllib_classification.R ---
@@ -313,7 +313,7 @@ test_that("spark.mlp", {
   # Test predict method
   mlpTestDF <- df
   mlpPredictions <- collect(select(predict(model, mlpTestDF), 
"prediction"))
-  expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", 
"0.0", "0.0", "0.0"))
+  expect_equal(head(mlpPredictions$prediction, 6), c("0.0", "1.0", "1.0", 
"1.0", "1.0", "1.0"))
--- End diff --

I checked the predictions. All `0.0` -> `1.0`, and all`1.0` -> `0.0`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r159578004
  
--- Diff: R/pkg/tests/fulltests/test_mllib_classification.R ---
@@ -348,12 +348,12 @@ test_that("spark.mlp", {
 
   # Test random seed
   # default seed
-  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 10)
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter 
= 100)
--- End diff --

Seems `maxIter = 10` is not stable. I increased to 100 to stabilize the 
predictions. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r159577911
  
--- Diff: R/pkg/tests/fulltests/test_mllib_classification.R ---
@@ -313,7 +313,7 @@ test_that("spark.mlp", {
   # Test predict method
   mlpTestDF <- df
   mlpPredictions <- collect(select(predict(model, mlpTestDF), 
"prediction"))
-  expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", 
"0.0", "0.0", "0.0"))
+  expect_equal(head(mlpPredictions$prediction, 6), c("0.0", "1.0", "1.0", 
"1.0", "1.0", "1.0"))
--- End diff --

This is due to the change of how we sort string labels with same frequency 
under the setting of frequencyDesc/Asc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-01-03 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/20146

[SPARK-11215][ML] Add multiple columns support to StringIndexer

## What changes were proposed in this pull request?

This takes over #19621 to add multi-column support to StringIndexer.

## How was this patch tested?

Added tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-11215

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20146.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20146


commit bb990f1a6511d8ce20f4fff254dfe0ff43262a10
Author: Liang-Chi Hsieh 
Date:   2018-01-03T03:51:59Z

Add multi-column support to StringIndexer.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org