zhengruifeng commented on a change in pull request #31160: URL: https://github.com/apache/spark/pull/31160#discussion_r557272412
########## File path: mllib/src/main/scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala ########## @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import scala.collection.mutable.ArrayBuilder + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.Since +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute, NumericAttribute} +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol, HasOutputCol} +import org.apache.spark.ml.stat.{ANOVATest, ChiSquareTest, FValueTest} +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.{StructField, StructType} + + +/** + * Params for [[UnivariateFeatureSelector]] and [[UnivariateFeatureSelectorModel]]. + */ +private[feature] trait UnivariateFeatureSelectorParams extends Params + with HasFeaturesCol with HasLabelCol with HasOutputCol { + + /** + * The feature type. + * Supported options: "categorical", "continuous" + * @group param + */ + @Since("3.1.1") + final val featureType = new Param[String](this, "featureType", + "Feature type. Supported options: categorical, continuous.", + ParamValidators.inArray(Array("categorical", "continuous"))) + + /** @group getParam */ + @Since("3.1.1") + def getFeatureType: String = $(featureType) + + /** + * The label type. + * Supported options: "categorical", "continuous" + * @group param + */ + @Since("3.1.1") + final val labelType = new Param[String](this, "labelType", + "Label type. Supported options: categorical, continuous.", + ParamValidators.inArray(Array("categorical", "continuous"))) + + /** @group getParam */ + @Since("3.1.1") + def getLabelType: String = $(labelType) + + /** + * The selection mode. + * Supported options: "numTopFeatures" (default), "percentile", "fpr", "fdr", "fwe" + * @group param + */ + @Since("3.1.1") + final val selectionMode = new Param[String](this, "selectionMode", + "The selection mode. Supported options: numTopFeatures, percentile, fpr, fdr, fwe", + ParamValidators.inArray(Array("numTopFeatures", "percentile", "fpr", "fdr", + "fwe"))) + + /** @group getParam */ + @Since("3.1.1") + def getSelectionMode: String = $(selectionMode) + + /** + * The upper bound of the features that selector will select. + * @group param + */ + @Since("3.1.1") + final val selectionThreshold = new DoubleParam(this, "selectionThreshold", + "The upper bound of the features that selector will select.") + + /** @group getParam */ + def getSelectionThreshold: Double = $(selectionThreshold) + + setDefault(selectionMode -> "numTopFeatures", selectionThreshold -> 50) +} + +/** + * The user can set `featureType` and labelType`, and Spark will pick the score function based on + * the specified `featureType` and labelType`. + * The following combination of `featureType` and `labelType` are supported: + * - `featureType` `categorical` and `labelType` `categorical`: Spark uses chi2. + * - `featureType` `continuous` and `labelType` `categorical`: Spark uses f_classif. + * - `featureType` `continuous` and `labelType` `continuous`: Spark uses f_regression. + * + * The `UnivariateFeatureSelector` supports different selection modes: `numTopFeatures`, + * `percentile`, `fpr`, `fdr`, `fwe`. + * - `numTopFeatures` chooses a fixed number of top features according to a hypothesis. + * - `percentile` is similar but chooses a fraction of all features instead of a fixed number. + * - `fpr` chooses all features whose p-value are below a threshold, thus controlling the false + * positive rate of selection. + * - `fdr` uses the <a href= + * "https://en.wikipedia.org/wiki/False_discovery_rate#Benjamini.E2.80.93Hochberg_procedure"> + * Benjamini-Hochberg procedure</a> + * to choose all features whose false discovery rate is below a threshold. + * - `fwe` chooses all features whose p-values are below a threshold. The threshold is scaled by + * 1/numFeatures, thus controlling the family-wise error rate of selection. + * + * By default, the selection mode is `numTopFeatures`, with the default selectionThreshold + * sets to 50. + */ +@Since("3.1.1") +final class UnivariateFeatureSelector @Since("3.1.1")(@Since("3.1.1") override val uid: String) + extends Estimator[UnivariateFeatureSelectorModel] with UnivariateFeatureSelectorParams + with DefaultParamsWritable { + + @Since("3.1.1") + def this() = this(Identifiable.randomUID("UnivariateFeatureSelector")) + + /** @group setParam */ + @Since("3.1.1") + def setSelectionMode(value: String): this.type = set(selectionMode, value) + + /** @group setParam */ + @Since("3.1.1") + def setSelectionThreshold(value: Double): this.type = set(selectionThreshold, value) + + /** @group setParam */ + @Since("3.1.1") + def setFeatureType(value: String): this.type = set(featureType, value) + + /** @group setParam */ + @Since("3.1.1") + def setLabelType(value: String): this.type = set(labelType, value) + + /** @group setParam */ + @Since("3.1.1") + def setFeaturesCol(value: String): this.type = set(featuresCol, value) + + /** @group setParam */ + @Since("3.1.1") + def setOutputCol(value: String): this.type = set(outputCol, value) + + /** @group setParam */ + @Since("3.1.1") + def setLabelCol(value: String): this.type = set(labelCol, value) + + @Since("3.1.1") + override def fit(dataset: Dataset[_]): UnivariateFeatureSelectorModel = { + transformSchema(dataset.schema, logging = true) + val spark = dataset.sparkSession + import spark.implicits._ + + val numFeatures = MetadataUtils.getNumFeatures(dataset, $(featuresCol)) + + require(isSet(featureType) && isSet(labelType), "featureType and labelType need to be set") + val resultDF = ($(featureType), $(labelType)) match { + case ("categorical", "categorical") => + ChiSquareTest.test(dataset.toDF, getFeaturesCol, getLabelCol, true) + case ("continuous", "categorical") => + ANOVATest.test(dataset.toDF, getFeaturesCol, getLabelCol, true) + case ("continuous", "continuous") => + FValueTest.test(dataset.toDF, getFeaturesCol, getLabelCol, true) + case _ => + throw new IllegalArgumentException(s"Unsupported combination:" + + s" featureType=${$(featureType)}, labelType=${$(labelType)}") + } + + def getTopIndices(k: Int): Array[Int] = { + resultDF.sort("pValue", "featureIndex") + .select("featureIndex") + .limit(k) + .as[Int] + .collect() + } + + val indices = $(selectionMode) match { + case "numTopFeatures" => + getTopIndices($(selectionThreshold).toInt) + case "percentile" => + getTopIndices((numFeatures * $(selectionThreshold)).toInt) + case "fpr" => + resultDF.select("featureIndex") + .where(col("pValue") < $(selectionThreshold)) + .as[Int].collect() + case "fdr" => + // This uses the Benjamini-Hochberg procedure. + // https://en.wikipedia.org/wiki/False_discovery_rate#Benjamini.E2.80.93Hochberg_procedure + val f = $(selectionThreshold) / numFeatures + val maxIndex = resultDF.sort("pValue", "featureIndex") + .select("pValue") + .as[Double].rdd + .zipWithIndex + .flatMap { case (pValue, index) => + if (pValue <= f * (index + 1)) { + Iterator.single(index.toInt) + } else Iterator.empty + }.fold(-1)(math.max) Review comment: this rdd maybe empty: ``` scala> sc.range(0, 0).max java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD.$anonfun$reduce$4(RDD.scala:1096) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1096) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.reduce(RDD.scala:1076) at org.apache.spark.rdd.RDD.$anonfun$max$1(RDD.scala:1511) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.max(RDD.scala:1511) ... 47 elided ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
