rebo16v commented on code in PR #48347: URL: https://github.com/apache/spark/pull/48347#discussion_r1793050272
########## mllib/src/main/scala/org/apache/spark/ml/feature/TargetEncoder.scala: ########## @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import scala.collection.immutable.ArraySeq + +import org.apache.hadoop.fs.Path + +import org.apache.spark.{SparkException, SparkRuntimeException} +import org.apache.spark.annotation.Since +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.attribute.NominalAttribute +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +/** Private trait for params and common methods for TargetEncoder and TargetEncoderModel */ +private[ml] trait TargetEncoderBase extends Params with HasLabelCol + with HasInputCol with HasInputCols with HasOutputCol with HasOutputCols with HasHandleInvalid { + + /** + * Param for how to handle invalid data during transform(). + * Options are 'keep' (invalid data presented as an extra categorical feature) or + * 'error' (throw an error). + * Note that this Param is only used during transform; during fitting, invalid data + * will result in an error. + * Default: "error" + * @group param + */ + @Since("4.0.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", + "How to handle invalid data during transform(). " + + "Options are 'keep' (invalid data presented as an extra categorical feature) " + + "or error (throw an error). Note that this Param is only used during transform; " + + "during fitting, invalid data will result in an error.", + ParamValidators.inArray(TargetEncoder.supportedHandleInvalids)) + + setDefault(handleInvalid -> TargetEncoder.ERROR_INVALID) + + @Since("4.0.0") + val targetType: Param[String] = new Param[String](this, "targetType", + "How to handle invalid data during transform(). " + + "Options are 'keep' (invalid data presented as an extra categorical feature) " + + "or error (throw an error). Note that this Param is only used during transform; " + + "during fitting, invalid data will result in an error.", + ParamValidators.inArray(TargetEncoder.supportedTargetTypes)) + + setDefault(targetType -> TargetEncoder.TARGET_BINARY) + + final def getTargetType: String = $(targetType) + + @Since("4.0.0") + val smoothing: DoubleParam = new DoubleParam(this, "smoothing", + "lower bound of the output feature range", + ParamValidators.gtEq(0.0)) + + setDefault(smoothing -> 0.0) + + final def getSmoothing: Double = $(smoothing) + + private[feature] lazy val inputFeatures = if (isSet(inputCol)) Array($(inputCol)) + else if (isSet(inputCols)) $(inputCols) + else Array.empty[String] + + private[feature] lazy val outputFeatures = if (isSet(outputCol)) Array($(outputCol)) + else if (isSet(outputCols)) $(outputCols) + else inputFeatures.map{field: String => s"${field}_indexed"} + + private[feature] def validateSchema(schema: StructType, + fitting: Boolean): StructType = { + + require(inputFeatures.length > 0, + s"At least one input column must be specified.") + + require(inputFeatures.length == outputFeatures.length, + s"The number of input columns ${inputFeatures.length} must be the same as the number of " + + s"output columns ${outputFeatures.length}.") + + val features = if (fitting) inputFeatures :+ $(labelCol) + else inputFeatures + + features.foreach { + feature => { + try { + val field = schema(feature) + if (field.dataType != DoubleType) { + throw new SparkException(s"Data type for column ${feature} is ${field.dataType}" + + s", but ${DoubleType.typeName} is required.") + } + } catch { + case e: IllegalArgumentException => + throw new SparkException(s"No column named ${feature} found on dataset.") + } + } + } + schema + } + +} + +/** + * Target Encoding maps a column of categorical indices into a numerical feature derived + * from the target. + * + * When `handleInvalid` is configured to 'keep', previously unseen values of a feature + * are mapped to the dataset overall statistics. + * + * When 'targetType' is configured to 'binary', categories are encoded as the conditional + * probability of the target given that category (bin counting). + * When 'targetType' is configured to 'continuous', categories are encoded as the average + * of the target given that category (mean encoding) + * + * Parameter 'smoothing' controls how in-category stats and overall stats are weighted. + * + * @note When encoding multi-column by using `inputCols` and `outputCols` params, input/output cols + * come in pairs, specified by the order in the arrays, and each pair is treated independently. + * + * @see `StringIndexer` for converting categorical values into category indices + */ +@Since("4.0.0") +class TargetEncoder @Since("4.0.0") (@Since("4.0.0") override val uid: String) + extends Estimator[TargetEncoderModel] with TargetEncoderBase with DefaultParamsWritable { + + @Since("4.0.0") + def this() = this(Identifiable.randomUID("TargetEncoder")) + + /** @group setParam */ + @Since("4.0.0") + def setLabelCol(value: String): this.type = set(labelCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setOutputCol(value: String): this.type = set(outputCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setInputCols(values: Array[String]): this.type = set(inputCols, values) + + /** @group setParam */ + @Since("4.0.0") + def setOutputCols(values: Array[String]): this.type = set(outputCols, values) + + /** @group setParam */ + @Since("4.0.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** @group setParam */ + @Since("4.0.0") + def setTargetType(value: String): this.type = set(targetType, value) + + /** @group setParam */ + @Since("4.0.0") + def setSmoothing(value: Double): this.type = set(smoothing, value) + + @Since("4.0.0") + override def transformSchema(schema: StructType): StructType = { + validateSchema(schema, fitting = true) + } + + @Since("4.0.0") + override def fit(dataset: Dataset[_]): TargetEncoderModel = { + validateSchema(dataset.schema, fitting = true) + + val stats = dataset + .select(ArraySeq.unsafeWrapArray( + (inputFeatures :+ $(labelCol)).map(col)): _*) + .rdd + .treeAggregate( + Array.fill(inputFeatures.length) { + Map.empty[Double, (Double, Double)] + })( + (agg, row: Row) => { + val label = row.getDouble(inputFeatures.length) + Range(0, inputFeatures.length).map { + feature => try { + val value = row.getDouble(feature) + if (value < 0.0 || value != value.toInt) throw new SparkException( + s"Values from column ${inputFeatures(feature)} must be indices, but got $value.") + val counter = agg(feature).getOrElse(value, (0.0, 0.0)) + val globalCounter = agg(feature).getOrElse(TargetEncoder.UNSEEN_CATEGORY, (0.0, 0.0)) + $(targetType) match { + case TargetEncoder.TARGET_BINARY => + if (label == 1.0) agg(feature) + + (value -> (1 + counter._1, 1 + counter._2)) + + (TargetEncoder.UNSEEN_CATEGORY -> (1 + globalCounter._1, 1 + globalCounter._2)) + else if (label == 0.0) agg(feature) + + (value -> (1 + counter._1, counter._2)) + + (TargetEncoder.UNSEEN_CATEGORY -> (1 + globalCounter._1, globalCounter._2)) + else throw new SparkException( + s"Values from column ${getLabelCol} must be binary (0,1) but got $label.") + case TargetEncoder.TARGET_CONTINUOUS => agg(feature) + + (value -> (1 + counter._1, + counter._2 + ((label - counter._2) / (1 + counter._1)))) + + (TargetEncoder.UNSEEN_CATEGORY -> (1 + globalCounter._1, + globalCounter._2 + ((label - globalCounter._2) / (1 + globalCounter._1)))) + } + } catch { + case e: SparkRuntimeException => + if (e.getErrorClass == "ROW_VALUE_IS_NULL") { + throw new SparkException(s"Null value found in feature ${inputFeatures(feature)}." + + s" See Imputer estimator for completing missing values.") + } else throw e + } + }.toArray + }, + (agg1, agg2) => Range(0, inputFeatures.length) + .map { + feature => { + val values = agg1(feature).keySet ++ agg2(feature).keySet + values.map(value => + value -> { + val stat1 = agg1(feature).getOrElse(value, (0.0, 0.0)) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
