[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239994064 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. +val labelsArray = $(stringOrderType) match { + case StringIndexer.frequencyDesc => +countByValue(filteredDF, inputCols).map { counts => + counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray --- End diff -- As one sorts by string and another one sorts by count, can we replace them with a compound expression with a single sortBy? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239993480 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. --- End diff -- I'll also add it to ml migration document. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239992942 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -310,11 +439,23 @@ object StringIndexerModel extends MLReadable[StringIndexerModel] { override def load(path: String): StringIndexerModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val data = sparkSession.read.parquet(dataPath) -.select("labels") -.head() - val labels = data.getAs[Seq[String]](0).toArray - val model = new StringIndexerModel(metadata.uid, labels) + + val (majorVersion, minorVersion) = majorMinorVersion(metadata.sparkVersion) + val labelsArray = if (majorVersion < 2 || (majorVersion == 2 && minorVersion <= 3)) { --- End diff -- But this needs to change for Spark 3.0 now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239992845 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -310,11 +439,23 @@ object StringIndexerModel extends MLReadable[StringIndexerModel] { override def load(path: String): StringIndexerModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val data = sparkSession.read.parquet(dataPath) -.select("labels") -.head() - val labels = data.getAs[Seq[String]](0).toArray - val model = new StringIndexerModel(metadata.uid, labels) + + val (majorVersion, minorVersion) = majorMinorVersion(metadata.sparkVersion) + val labelsArray = if (majorVersion < 2 || (majorVersion == 2 && minorVersion <= 3)) { --- End diff -- This is for loading old StringIndexerModel saved by previous Spark. Previous model has `labels`, but new model has `labelsArray`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239992579 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. +val labelsArray = $(stringOrderType) match { + case StringIndexer.frequencyDesc => +countByValue(filteredDF, inputCols).map { counts => + counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray +} + case StringIndexer.frequencyAsc => +countByValue(filteredDF, inputCols).map { counts => + counts.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray +} + case StringIndexer.alphabetDesc => +import dataset.sparkSession.implicits._ +inputCols.map { inputCol => + filteredDF.select(inputCol).distinct().sort(dataset(s"$inputCol").desc) --- End diff -- That's good. I miss this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239992360 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +81,53 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"The number of input columns does not match output columns") + ($(inputCols), $(outputCols)) +} + } + + private def validateAndTransformField( + schema: StructType, + inputColName: String, + outputColName: String): StructField = { val inputDataType = schema(inputColName).dataType require(inputDataType == StringType || inputDataType.isInstanceOf[NumericType], s"The input column $inputColName must be either string type or numeric type, " + s"but got $inputDataType.") -val inputFields = schema.fields -val outputColName = $(outputCol) -require(inputFields.forall(_.name != outputColName), +require(schema.fields.forall(_.name != outputColName), --- End diff -- Sounds good to me. Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239992378 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. --- End diff -- Moved to `stringOrderType`'s doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239991238 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") --- End diff -- Yes. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239889212 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -177,32 +245,47 @@ object StringIndexer extends DefaultParamsReadable[StringIndexer] { /** * Model fitted by [[StringIndexer]]. * - * @param labels Ordered list of labels, corresponding to indices to be assigned. + * @param labelsArray Array of ordered list of labels, corresponding to indices to be assigned + *for each input column. * - * @note During transformation, if the input column does not exist, - * `StringIndexerModel.transform` would return the input dataset unmodified. + * @note During transformation, if any input column does not exist, + * `StringIndexerModel.transform` would skip the input column. + * If all input columns do not exist, it returns the input dataset unmodified. * This is a temporary fix for the case when target labels do not exist during prediction. */ @Since("1.4.0") class StringIndexerModel ( @Since("1.4.0") override val uid: String, -@Since("1.5.0") val labels: Array[String]) +@Since("2.4.0") val labelsArray: Array[Array[String]]) extends Model[StringIndexerModel] with StringIndexerBase with MLWritable { import StringIndexerModel._ @Since("1.5.0") - def this(labels: Array[String]) = this(Identifiable.randomUID("strIdx"), labels) - - private val labelToIndex: OpenHashMap[String, Double] = { -val n = labels.length -val map = new OpenHashMap[String, Double](n) -var i = 0 -while (i < n) { - map.update(labels(i), i) - i += 1 + def this(labels: Array[String]) = this(Identifiable.randomUID("strIdx"), Array(labels)) + + @Since("2.4.0") + def this(labelsArray: Array[Array[String]]) = this(Identifiable.randomUID("strIdx"), labelsArray) + + @Since("1.5.0") + def labels: Array[String] = { +require(labelsArray.length == 1, "This StringIndexerModel is fitted by multi-columns, " + + "call for `labelsArray` instead.") +labelsArray(0) + } + + // Prepares the maps for string values to corresponding index values. + private val labelsToIndexArray: Array[OpenHashMap[String, Double]] = { +for (labels <- labelsArray) yield { + val n = labels.length + val map = new OpenHashMap[String, Double](n) + var i = 0 --- End diff -- We could use a `foreach` here or similar to avoid the var + loop? (And or put from seq logic in the openhashmap). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239887472 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. --- End diff -- Is this a change of behaviour? If so we should document it in a visible place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239889869 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -310,11 +439,23 @@ object StringIndexerModel extends MLReadable[StringIndexerModel] { override def load(path: String): StringIndexerModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val data = sparkSession.read.parquet(dataPath) -.select("labels") -.head() - val labels = data.getAs[Seq[String]](0).toArray - val model = new StringIndexerModel(metadata.uid, labels) + + val (majorVersion, minorVersion) = majorMinorVersion(metadata.sparkVersion) + val labelsArray = if (majorVersion < 2 || (majorVersion == 2 && minorVersion <= 3)) { --- End diff -- I'm confused by this logic -- are we expecting people to use the MLlib code with different version of Spark here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239885097 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +81,53 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"The number of input columns does not match output columns") + ($(inputCols), $(outputCols)) +} + } + + private def validateAndTransformField( + schema: StructType, + inputColName: String, + outputColName: String): StructField = { val inputDataType = schema(inputColName).dataType require(inputDataType == StringType || inputDataType.isInstanceOf[NumericType], s"The input column $inputColName must be either string type or numeric type, " + s"but got $inputDataType.") -val inputFields = schema.fields -val outputColName = $(outputCol) -require(inputFields.forall(_.name != outputColName), +require(schema.fields.forall(_.name != outputColName), --- End diff -- minor: So this check seems good, should we add a check that there are not any duplicate output fields? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239888373 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. +val labelsArray = $(stringOrderType) match { + case StringIndexer.frequencyDesc => +countByValue(filteredDF, inputCols).map { counts => + counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray +} + case StringIndexer.frequencyAsc => +countByValue(filteredDF, inputCols).map { counts => + counts.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray +} + case StringIndexer.alphabetDesc => +import dataset.sparkSession.implicits._ +inputCols.map { inputCol => + filteredDF.select(inputCol).distinct().sort(dataset(s"$inputCol").desc) --- End diff -- This could trigger many actions and if the input is not cached that's not great. We could cache the filteredDF (and explicitily uncache later) as one solution, what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239887259 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. +val labelsArray = $(stringOrderType) match { + case StringIndexer.frequencyDesc => +countByValue(filteredDF, inputCols).map { counts => + counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray --- End diff -- So would it make sense to do a single sortBy with a compound expression instead of 2 sorts (and same bellow)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239888591 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. +val labelsArray = $(stringOrderType) match { + case StringIndexer.frequencyDesc => +countByValue(filteredDF, inputCols).map { counts => + counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray +} + case StringIndexer.frequencyAsc => +countByValue(filteredDF, inputCols).map { counts => + counts.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray +} + case StringIndexer.alphabetDesc => +import dataset.sparkSession.implicits._ +inputCols.map { inputCol => + filteredDF.select(inputCol).distinct().sort(dataset(s"$inputCol").desc) +.as[String].collect() +} + case StringIndexer.alphabetAsc => +import dataset.sparkSession.implicits._ +inputCols.map { inputCol => + filteredDF.select(inputCol).distinct().sort(dataset(s"$inputCol").asc) --- End diff -- Same comment as the Desc case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239885459 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") --- End diff -- Going to want to update these to 3.0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
GitHub user viirya reopened a pull request: https://github.com/apache/spark/pull/20146 [SPARK-11215][ML] Add multiple columns support to StringIndexer ## What changes were proposed in this pull request? This takes over #19621 to add multi-column support to StringIndexer. ## How was this patch tested? Added tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-11215 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20146.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20146 commit bb990f1a6511d8ce20f4fff254dfe0ff43262a10 Author: Liang-Chi Hsieh Date: 2018-01-03T03:51:59Z Add multi-column support to StringIndexer. commit 26cc94bb335cf0ba3bcdbc2b78effd447026792c Author: Liang-Chi Hsieh Date: 2018-01-07T01:42:28Z Fix glm test. commit 540c364d2a70ecd6ee5b92fadedc5e9b85026d2c Author: Liang-Chi Hsieh Date: 2018-01-16T08:20:19Z Merge remote-tracking branch 'upstream/master' into SPARK-11215 commit 18acbbf7b70b87c75ba62be863580fe9accc23b4 Author: Liang-Chi Hsieh Date: 2018-01-24T12:03:26Z Improve test cases. commit b884fb5c0ce1e627390d08d8425721ea8e4d Author: Liang-Chi Hsieh Date: 2018-01-27T00:58:06Z Merge remote-tracking branch 'upstream/master' into SPARK-11215 commit 76ff7bf6054a687abd9fc16c8044020a5454d95f Author: Liang-Chi Hsieh Date: 2018-04-19T11:27:21Z Merge remote-tracking branch 'upstream/master' into SPARK-11215 commit 50af02eaccce7cecb7c3093d5bc14675ca860c22 Author: Liang-Chi Hsieh Date: 2018-04-19T11:30:46Z Change from 2.3 to 2.4. commit c1be2c7e28ebdfed580577a108d2f254834caed7 Author: Liang-Chi Hsieh Date: 2018-04-23T10:15:49Z Address comments. commit ed35d875414ba3cf8751a77463f61665e9c373b0 Author: Liang-Chi Hsieh Date: 2018-04-23T14:00:16Z Address comment. commit a1dcfda85243a1e2210177f2acfb78821c539b17 Author: Liang-Chi Hsieh Date: 2018-04-24T06:41:07Z Use SQL Aggregator for counting string labels. commit c1685228f7ec7f4904bca67efaee70498b9894c8 Author: Liang-Chi Hsieh Date: 2018-04-25T13:28:24Z Merge remote-tracking branch 'upstream/master' into SPARK-11215 commit a6551b02a10428d66e0dadcfcb5a8da3798ec814 Author: Liang-Chi Hsieh Date: 2018-04-26T04:13:09Z Drop NA values for both frequency and alphabet order types. commit c003bd3d6c58cf19249ff0ba9dd10140971d655c Author: Liang-Chi Hsieh Date: 2018-07-18T18:58:21Z Merge remote-tracking branch 'upstream/master' into SPARK-11215 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya closed the pull request at: https://github.com/apache/spark/pull/20146 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183619017 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,57 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) --- End diff -- Use SQL `Aggregator` now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183408802 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() +val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, Long]()) + +// Counts by the string values in the dataset. +val countByValueArray = dataset.na.drop(inputCols) + .select(inputCols.map(col(_).cast(StringType)): _*) + .rdd.treeAggregate(zeroState)( --- End diff -- Do you think we should replace `treeAggregate` with `Aggregator` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183404967 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() +val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, Long]()) + +// Counts by the string values in the dataset. +val countByValueArray = dataset.na.drop(inputCols) + .select(inputCols.map(col(_).cast(StringType)): _*) + .rdd.treeAggregate(zeroState)( +(state: Array[OpenHashMap[String, Long]], row: Row) => { + for (i <- 0 until inputCols.length) { +state(i).changeValue(row.getString(i), 1L, _ + 1) + } + state +}, +(state1: Array[OpenHashMap[String, Long]], state2: Array[OpenHashMap[String, Long]]) => { + for (i <- 0 until inputCols.length) { +state2(i).foreach { case (key: String, count: Long) => + state1(i).changeValue(key, count, _ + count) +} + } + state1 +} + ) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. +val labelsArray = countByValueArray.map { countByValue => + $(stringOrderType) match { +case StringIndexer.frequencyDesc => + countByValue.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray +case StringIndexer.frequencyAsc => + countByValue.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray +case StringIndexer.alphabetDesc => countByValue.toSeq.map(_._1).sortWith(_ > _).toArray --- End diff -- Good catch! We don't need to count the labels under `StringIndexer.alphabetDesc` and `StringIndexer.alphabetAsc`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183393215 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() +val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, Long]()) + +// Counts by the string values in the dataset. +val countByValueArray = dataset.na.drop(inputCols) + .select(inputCols.map(col(_).cast(StringType)): _*) + .rdd.treeAggregate(zeroState)( --- End diff -- I think it should be doable with SQL `Aggregator`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183344264 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) + } + filteredDataset = filteredDataset.where(filterer(dataset(inputColName))) } +filteredDataset + } -val metadata = NominalAttribute.defaultAttr - .withName($(outputCol)).withValues(filteredLabels).toMetadata() -// If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = $(handleInvalid) match { - case StringIndexer.SKIP_INVALID => -val filterer = udf { label: String => - labelToIndex.contains(label) -} - (dataset.na.drop(Array($(inputCol))).where(filterer(dataset($(inputCol, false) - case _ => (dataset, getHandleInvalid == StringIndexer.KEEP_INVALID) -} + private def getIndexer(labels: Seq[String], labelToIndex: OpenHashMap[String, Double]) = { +val keepInvalid = (getHandleInvalid == StringIndexer.KEEP_INVALID) -val indexer = udf { label: String => +udf { label: String => --- End diff -- > This requires calling many udf for different input columns. Should we combine then in one udf? Then we must define the single UDF with different parameter number (looks like the big pattern matching in `ScalaUDF`). We also don't support UDFs with more than 22 parameters. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183341186 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) + } + filteredDataset = filteredDataset.where(filterer(dataset(inputColName))) } +filteredDataset + } -val metadata = NominalAttribute.defaultAttr - .withName($(outputCol)).withValues(filteredLabels).toMetadata() -// If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = $(handleInvalid) match { - case StringIndexer.SKIP_INVALID => -val filterer = udf { label: String => - labelToIndex.contains(label) -} - (dataset.na.drop(Array($(inputCol))).where(filterer(dataset($(inputCol, false) - case _ => (dataset, getHandleInvalid == StringIndexer.KEEP_INVALID) -} + private def getIndexer(labels: Seq[String], labelToIndex: OpenHashMap[String, Double]) = { +val keepInvalid = (getHandleInvalid == StringIndexer.KEEP_INVALID) -val indexer = udf { label: String => +udf { label: String => --- End diff -- `filteredDataset` may not be able to put with indexer together. Indexer UDFs (returning double) do transformation to the data, but `filteredDataset` logic uses the UDFs (returning boolean) to filter invalid rows. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183339386 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) + } + filteredDataset = filteredDataset.where(filterer(dataset(inputColName))) --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183336177 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) --- End diff -- Lookup for a key in a map is not efficient in SQL now. It is basically a linear search in a key array if I don't miss anything. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183334835 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"The number of input columns does not match output columns") + ($(inputCols), $(outputCols)) +} + } + + private def validateAndTransformField( + schema: StructType, + inputColName: String, + outputColName: String): StructField = { val inputDataType = schema(inputColName).dataType require(inputDataType == StringType || inputDataType.isInstanceOf[NumericType], s"The input column $inputColName must be either string type or numeric type, " + s"but got $inputDataType.") -val inputFields = schema.fields -val outputColName = $(outputCol) -require(inputFields.forall(_.name != outputColName), +require(schema.fields.forall(_.name != outputColName), s"Output column $outputColName already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() -StructType(outputFields) +NominalAttribute.defaultAttr.withName($(outputCol)).toStructField() + } + + /** Validates and transforms the input schema. */ + protected def validateAndTransformSchema( + schema: StructType, + skipNonExistsCol: Boolean = false): StructType = { +val (inputColNames, outputColNames) = getInOutCols() + +val outputFields = for (i <- 0 until inputColNames.length) yield { --- End diff -- Looks good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r18752 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { --- End diff -- `ParamValidators.checkSingleVsMultiColumnParams` checks the param validity under multiple columns support. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183257701 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) --- End diff -- add one empty line for readbility --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183255152 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() +val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, Long]()) + +// Counts by the string values in the dataset. +val countByValueArray = dataset.na.drop(inputCols) + .select(inputCols.map(col(_).cast(StringType)): _*) + .rdd.treeAggregate(zeroState)( +(state: Array[OpenHashMap[String, Long]], row: Row) => { + for (i <- 0 until inputCols.length) { +state(i).changeValue(row.getString(i), 1L, _ + 1) + } + state +}, +(state1: Array[OpenHashMap[String, Long]], state2: Array[OpenHashMap[String, Long]]) => { + for (i <- 0 until inputCols.length) { +state2(i).foreach { case (key: String, count: Long) => + state1(i).changeValue(key, count, _ + count) +} + } + state1 +} + ) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. +val labelsArray = countByValueArray.map { countByValue => + $(stringOrderType) match { +case StringIndexer.frequencyDesc => + countByValue.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray +case StringIndexer.frequencyAsc => + countByValue.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray +case StringIndexer.alphabetDesc => countByValue.toSeq.map(_._1).sortWith(_ > _).toArray --- End diff -- I think we can break the code into two paths. One is sorting by frequency which requires to compute the counts, and the other is sorting by alphabet which only requires distinct. We could move the `countByValueArray` code into labelsArray. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183253488 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { --- End diff -- If both `inputCol` and `inputCols` are set, throw an exception. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r18325 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) --- End diff -- Isn't this very expansive? We basically look up `labelToIndex` twice. Will be cool that we support `lit(Map())` so we can do those lookup natively in SQL, and also `na.drop` together in wholestage codegen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183253932 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"The number of input columns does not match output columns") + ($(inputCols), $(outputCols)) +} + } + + private def validateAndTransformField( + schema: StructType, + inputColName: String, + outputColName: String): StructField = { val inputDataType = schema(inputColName).dataType require(inputDataType == StringType || inputDataType.isInstanceOf[NumericType], s"The input column $inputColName must be either string type or numeric type, " + s"but got $inputDataType.") -val inputFields = schema.fields -val outputColName = $(outputCol) -require(inputFields.forall(_.name != outputColName), +require(schema.fields.forall(_.name != outputColName), s"Output column $outputColName already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() -StructType(outputFields) +NominalAttribute.defaultAttr.withName($(outputCol)).toStructField() + } + + /** Validates and transforms the input schema. */ + protected def validateAndTransformSchema( + schema: StructType, + skipNonExistsCol: Boolean = false): StructType = { +val (inputColNames, outputColNames) = getInOutCols() + +val outputFields = for (i <- 0 until inputColNames.length) yield { + if (schema.fieldNames.contains(inputColNames(i))) { +validateAndTransformField(schema, inputColNames(i), outputColNames(i)) + } else { +if (skipNonExistsCol) { + null +} else { + throw new SparkException(s"Input column ${inputColNames(i)} does not exist.") +} + } +} +StructType(schema.fields ++ outputFields.filter(_ != null)) --- End diff -- Then you don't need to filter with the above code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183257676 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { --- End diff -- Please add some comment about what is the invalid data. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183254078 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() +val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, Long]()) + +// Counts by the string values in the dataset. +val countByValueArray = dataset.na.drop(inputCols) + .select(inputCols.map(col(_).cast(StringType)): _*) + .rdd.treeAggregate(zeroState)( --- End diff -- Possible to aggregate natively with SQL? I don't think we will compromise the performance with SQL aggregation like `groupBy` and `agg` and `countDistinct` without using tree aggregation since the states will be very small in this use-case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183258353 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) + } + filteredDataset = filteredDataset.where(filterer(dataset(inputColName))) } +filteredDataset + } -val metadata = NominalAttribute.defaultAttr - .withName($(outputCol)).withValues(filteredLabels).toMetadata() -// If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = $(handleInvalid) match { - case StringIndexer.SKIP_INVALID => -val filterer = udf { label: String => - labelToIndex.contains(label) -} - (dataset.na.drop(Array($(inputCol))).where(filterer(dataset($(inputCol, false) - case _ => (dataset, getHandleInvalid == StringIndexer.KEEP_INVALID) -} + private def getIndexer(labels: Seq[String], labelToIndex: OpenHashMap[String, Double]) = { +val keepInvalid = (getHandleInvalid == StringIndexer.KEEP_INVALID) -val indexer = udf { label: String => +udf { label: String => --- End diff -- This requires calling many udf for different input columns. Should we combine then in one udf? The `filteredDataset` logic can be in as well to avoid multiple lookups. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183257799 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) + } + filteredDataset = filteredDataset.where(filterer(dataset(inputColName))) --- End diff -- is it possible to not use `var filteredDataset`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183253904 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"The number of input columns does not match output columns") + ($(inputCols), $(outputCols)) +} + } + + private def validateAndTransformField( + schema: StructType, + inputColName: String, + outputColName: String): StructField = { val inputDataType = schema(inputColName).dataType require(inputDataType == StringType || inputDataType.isInstanceOf[NumericType], s"The input column $inputColName must be either string type or numeric type, " + s"but got $inputDataType.") -val inputFields = schema.fields -val outputColName = $(outputCol) -require(inputFields.forall(_.name != outputColName), +require(schema.fields.forall(_.name != outputColName), s"Output column $outputColName already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() -StructType(outputFields) +NominalAttribute.defaultAttr.withName($(outputCol)).toStructField() + } + + /** Validates and transforms the input schema. */ + protected def validateAndTransformSchema( + schema: StructType, + skipNonExistsCol: Boolean = false): StructType = { +val (inputColNames, outputColNames) = getInOutCols() + +val outputFields = for (i <- 0 until inputColNames.length) yield { --- End diff -- Nit, why not the following for readability? ```scala val outputFields = inputColNames.zip(outputColNames).flatMap { case (inputColName, outputColName) => schema.fieldNames.contains(inputColName) match { case true => validateAndTransformField(schema, inputColName, outputColName) case false if skipNonExistsCol => None case throw new SparkException(s"Input column $inputColName does not exist." } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r163528221 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -331,4 +357,51 @@ class StringIndexerSuite val dfWithIndex = model.transform(dfNoBristol) assert(dfWithIndex.filter($"CITYIndexed" === 1.0).count == 1) } + + test("StringIndexer multiple input columns") { --- End diff -- Good point. I added one test for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r163528239 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -33,12 +33,38 @@ class StringIndexerSuite test("params") { ParamsSuite.checkParams(new StringIndexer) -val model = new StringIndexerModel("indexer", Array("a", "b")) +val model = new StringIndexerModel("indexer", Array(Array("a", "b"))) val modelWithoutUid = new StringIndexerModel(Array("a", "b")) ParamsSuite.checkParams(model) ParamsSuite.checkParams(modelWithoutUid) } + test("params: input/output columns") { +val stringIndexerSingleCol = new StringIndexer() + .setInputCol("in").setOutputCol("out") +val inOutCols1 = stringIndexerSingleCol.getInOutCols() +assert(inOutCols1._1 === Array("in")) +assert(inOutCols1._2 === Array("out")) + +val stringIndexerMultiCol = new StringIndexer() + .setInputCols(Array("in1", "in2")).setOutputCols(Array("out1", "out2")) +val inOutCols2 = stringIndexerMultiCol.getInOutCols() +assert(inOutCols2._1 === Array("in1", "in2")) +assert(inOutCols2._2 === Array("out1", "out2")) + +intercept[IllegalArgumentException] { + new StringIndexer().setInputCol("in").setOutputCols(Array("out1", "out2")).getInOutCols() --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r162777500 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala --- @@ -249,6 +249,16 @@ object ParamValidators { def arrayLengthGt[T](lowerBound: Double): Array[T] => Boolean = { (value: Array[T]) => value.length > lowerBound } + + /** Check if more than one param in a set of exclusive params are set. */ + def checkExclusiveParams(model: Params, params: String*): Unit = { +if (params.filter(paramName => model.hasParam(paramName) && --- End diff -- The purpose of this method is to check if more than one Params are set among some exclusive Params within a Model. Is it useful to put an irrelevant Param into the exclusive Params to check? As we already know what Params the model has, it sounds like we want to check an irrelevant Param that we already know non-existing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r162715788 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala --- @@ -249,6 +249,16 @@ object ParamValidators { def arrayLengthGt[T](lowerBound: Double): Array[T] => Boolean = { (value: Array[T]) => value.length > lowerBound } + + /** Check if more than one param in a set of exclusive params are set. */ + def checkExclusiveParams(model: Params, params: String*): Unit = { +if (params.filter(paramName => model.hasParam(paramName) && --- End diff -- Why is this checking to see if the Param belongs to the Model? If this method is called with irrelevant Params, shouldn't it throw an error? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r161039325 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -33,12 +33,38 @@ class StringIndexerSuite test("params") { ParamsSuite.checkParams(new StringIndexer) -val model = new StringIndexerModel("indexer", Array("a", "b")) +val model = new StringIndexerModel("indexer", Array(Array("a", "b"))) val modelWithoutUid = new StringIndexerModel(Array("a", "b")) ParamsSuite.checkParams(model) ParamsSuite.checkParams(modelWithoutUid) } + test("params: input/output columns") { +val stringIndexerSingleCol = new StringIndexer() + .setInputCol("in").setOutputCol("out") +val inOutCols1 = stringIndexerSingleCol.getInOutCols() +assert(inOutCols1._1 === Array("in")) +assert(inOutCols1._2 === Array("out")) + +val stringIndexerMultiCol = new StringIndexer() + .setInputCols(Array("in1", "in2")).setOutputCols(Array("out1", "out2")) +val inOutCols2 = stringIndexerMultiCol.getInOutCols() +assert(inOutCols2._1 === Array("in1", "in2")) +assert(inOutCols2._2 === Array("out1", "out2")) + +intercept[IllegalArgumentException] { + new StringIndexer().setInputCol("in").setOutputCols(Array("out1", "out2")).getInOutCols() --- End diff -- It seems better that, use the way calling `stringIndexer.fit` and check exception thrown. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r161040131 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala --- @@ -249,6 +249,16 @@ object ParamValidators { def arrayLengthGt[T](lowerBound: Double): Array[T] => Boolean = { (value: Array[T]) => value.length > lowerBound } + + /** Check if more than one param in a set of exclusive params are set. */ + def checkExclusiveParams(model: Params, params: String*): Unit = { --- End diff -- @hhbyyh Is this the common helper methods you want for checking in/out single/multiple columns ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r161040537 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -331,4 +357,51 @@ class StringIndexerSuite val dfWithIndex = model.transform(dfNoBristol) assert(dfWithIndex.filter($"CITYIndexed" === 1.0).count == 1) } + + test("StringIndexer multiple input columns") { --- End diff -- Is there a test case, for testing `frequencyAsc/Desc`, when frequency equal, it will secondary sort by alphabets ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r160066219 --- Diff: R/pkg/tests/fulltests/test_mllib_regression.R --- @@ -126,15 +134,15 @@ test_that("spark.glm summary", { out <- capture.output(print(stats)) expect_match(out[2], "Deviance Residuals:") - expect_true(any(grepl("AIC: 59.22", out))) + expect_true(any(grepl("AIC: 35.84", out))) --- End diff -- R glm's AIC: 35.839: ```R > out <- capture.output(print(rStats)) > out [1] "" [2] "Call:" [3] "glm(formula = Sepal.Width ~ Sepal.Length + Species, data = dataset)" [4] "" [5] "Deviance Residuals: " [6] " 1234567 8 " [7] " 0. -1.4932 1.5491 0.5411 -0.8581 -1.2228 -0.5969 2.0809 " [8] "" [9] "Coefficients:" [10] " Estimate Std. Error t value Pr(>|t|)" [11] "(Intercept) 1.7150 2.0492 0.8370.450" [12] "Sepal.Length0.1925 0.5566 0.3460.747" [13] "Speciesversicolor 1.7894 1.9240 0.9300.405" [14] "Speciesvirginica1.2613 2.0735 0.6080.576" [15] "" [16] "(Dispersion parameter for gaussian family taken to be 2.960032)" [17] "" [18] "Null deviance: 14.719 on 7 degrees of freedom" [19] "Residual deviance: 11.840 on 4 degrees of freedom" [20] "AIC: 35.839" [21] "" [22] "Number of Fisher Scoring iterations: 2" [23] "" ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r160066433 --- Diff: R/pkg/tests/fulltests/test_mllib_regression.R --- @@ -174,17 +182,17 @@ test_that("spark.glm summary", { expect_equal(stats$aic, rStats$aic) # Test spark.glm works with offset - training <- suppressWarnings(createDataFrame(iris)) + training <- suppressWarnings(createDataFrame(dataset)) stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species, family = poisson(), offsetCol = "Petal_Length")) rStats <- suppressWarnings(summary(glm(Sepal.Width ~ Sepal.Length + Species, -data = iris, family = poisson(), offset = iris$Petal.Length))) +data = dataset, family = poisson(), offset = dataset$Petal.Length))) expect_true(all(abs(rStats$coefficients - stats$coefficients) < 1e-3)) # Test summary works on base GLM models - baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris) + baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = dataset) baseSummary <- summary(baseModel) - expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4) + expect_true(abs(baseSummary$deviance - 11.84013) < 1e-4) --- End diff -- R glm: ```R > baseSummary <- summary(stats::glm(Sepal.Width ~ Sepal.Length + Species, data = dataset)) > baseSummary$deviance [1] 11.84013 ``` Spark glm: ```R > baseSummary <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species)) > baseSummary$deviance [1] 11.84013 ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r160049837 --- Diff: R/pkg/tests/fulltests/test_mllib_classification.R --- @@ -348,12 +348,12 @@ test_that("spark.mlp", { # Test random seed # default seed - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10) + model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 100) --- End diff -- Ran it again with the config: ```R > sparkR.conf("spark.sparkr.use.daemon") $spark.sparkr.use.daemon [1] "false" > start.time <- Sys.time() > model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10) > end.time <- Sys.time() > time.taken <- end.time - start.time > time.taken Time difference of 1.704288 secs ``` ```R > start.time <- Sys.time() > model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 100) > end.time <- Sys.time() > time.taken <- end.time - start.time > time.taken Time difference of 5.135418 secs ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r160048655 --- Diff: R/pkg/tests/fulltests/test_mllib_classification.R --- @@ -348,12 +348,12 @@ test_that("spark.mlp", { # Test random seed # default seed - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10) + model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 100) --- End diff -- Ahh, @viirya, would you mind if I ask to check it after setting `spark.sparkr.use.daemon` to `false` too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r160039165 --- Diff: R/pkg/tests/fulltests/test_mllib_classification.R --- @@ -348,12 +348,12 @@ test_that("spark.mlp", { # Test random seed # default seed - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10) + model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 100) --- End diff -- ```R > start.time <- Sys.time() > model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10) > end.time <- Sys.time() > time.taken <- end.time - start.time > time.taken Time difference of 1.780564 secs ``` ```R > start.time <- Sys.time() > model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 100) > end.time <- Sys.time() > time.taken <- end.time - start.time > time.taken Time difference of 5.728089 secs ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r159584417 --- Diff: R/pkg/tests/fulltests/test_mllib_classification.R --- @@ -348,12 +348,12 @@ test_that("spark.mlp", { # Test random seed # default seed - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10) + model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 100) --- End diff -- can you check if the run time increases significantly? this is an issue before - see SPARK-21693 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r159580805 --- Diff: R/pkg/tests/fulltests/test_mllib_classification.R --- @@ -313,7 +313,7 @@ test_that("spark.mlp", { # Test predict method --- End diff -- Actually I think we may remove this test `Test predict method`. Seems to me, with the `tol = 0.5`, the prediction may not be very meaningful. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r159579587 --- Diff: R/pkg/tests/fulltests/test_mllib_classification.R --- @@ -313,7 +313,7 @@ test_that("spark.mlp", { # Test predict method mlpTestDF <- df mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0")) + expect_equal(head(mlpPredictions$prediction, 6), c("0.0", "1.0", "1.0", "1.0", "1.0", "1.0")) --- End diff -- I checked the predictions. All `0.0` -> `1.0`, and all`1.0` -> `0.0`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r159578004 --- Diff: R/pkg/tests/fulltests/test_mllib_classification.R --- @@ -348,12 +348,12 @@ test_that("spark.mlp", { # Test random seed # default seed - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10) + model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 100) --- End diff -- Seems `maxIter = 10` is not stable. I increased to 100 to stabilize the predictions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r159577911 --- Diff: R/pkg/tests/fulltests/test_mllib_classification.R --- @@ -313,7 +313,7 @@ test_that("spark.mlp", { # Test predict method mlpTestDF <- df mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0")) + expect_equal(head(mlpPredictions$prediction, 6), c("0.0", "1.0", "1.0", "1.0", "1.0", "1.0")) --- End diff -- This is due to the change of how we sort string labels with same frequency under the setting of frequencyDesc/Asc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/20146 [SPARK-11215][ML] Add multiple columns support to StringIndexer ## What changes were proposed in this pull request? This takes over #19621 to add multi-column support to StringIndexer. ## How was this patch tested? Added tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-11215 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20146.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20146 commit bb990f1a6511d8ce20f4fff254dfe0ff43262a10 Author: Liang-Chi HsiehDate: 2018-01-03T03:51:59Z Add multi-column support to StringIndexer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org