rebo16v commented on code in PR #48347:
URL: https://github.com/apache/spark/pull/48347#discussion_r1803769041


##########
mllib/src/main/scala/org/apache/spark/ml/feature/TargetEncoder.scala:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.attribute.NominalAttribute
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types._
+
+/** Private trait for params and common methods for TargetEncoder and 
TargetEncoderModel */
+private[ml] trait TargetEncoderBase extends Params with HasLabelCol
+  with HasInputCol with HasInputCols with HasOutputCol with HasOutputCols with 
HasHandleInvalid {
+
+  /**
+   * Param for how to handle invalid data during transform().
+   * Options are 'keep' (invalid data presented as an extra categorical 
feature) or
+   * 'error' (throw an error).
+   * Note that this Param is only used during transform; during fitting, 
invalid data
+   * will result in an error.
+   * Default: "error"
+   * @group param
+   */
+  @Since("4.0.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+    "How to handle invalid data during transform(). " +
+      "Options are 'keep' (invalid data presented as an extra categorical 
feature) " +
+      "or error (throw an error). Note that this Param is only used during 
transform; " +
+      "during fitting, invalid data will result in an error.",
+    ParamValidators.inArray(TargetEncoder.supportedHandleInvalids))
+
+  setDefault(handleInvalid -> TargetEncoder.ERROR_INVALID)
+
+  @Since("4.0.0")
+  val targetType: Param[String] = new Param[String](this, "targetType",
+    "How to handle invalid data during transform(). " +
+      "Options are 'keep' (invalid data presented as an extra categorical 
feature) " +
+      "or error (throw an error). Note that this Param is only used during 
transform; " +
+      "during fitting, invalid data will result in an error.",
+    ParamValidators.inArray(TargetEncoder.supportedTargetTypes))
+
+  setDefault(targetType -> TargetEncoder.TARGET_BINARY)
+
+  final def getTargetType: String = $(targetType)
+
+  @Since("4.0.0")
+  val smoothing: DoubleParam = new DoubleParam(this, "smoothing",
+    "lower bound of the output feature range",
+    ParamValidators.gtEq(0.0))
+
+  setDefault(smoothing -> 0.0)
+
+  final def getSmoothing: Double = $(smoothing)
+
+  private[feature] lazy val inputFeatures = if (isSet(inputCol)) 
Array($(inputCol))
+                                            else if (isSet(inputCols)) 
$(inputCols)
+                                            else Array.empty[String]
+
+  private[feature] lazy val outputFeatures = if (isSet(outputCol)) 
Array($(outputCol))
+                          else if (isSet(outputCols)) $(outputCols)
+                          else inputFeatures.map{field: String => 
s"${field}_indexed"}
+
+  private[feature] def validateSchema(schema: StructType,
+                                      fitting: Boolean): StructType = {
+
+    require(inputFeatures.length > 0,
+      s"At least one input column must be specified.")
+
+    require(inputFeatures.length == outputFeatures.length,
+      s"The number of input columns ${inputFeatures.length} must be the same 
as the number of " +
+        s"output columns ${outputFeatures.length}.")
+
+    val features = if (fitting) inputFeatures :+ $(labelCol)
+    else inputFeatures
+
+    features.foreach {
+      feature => {
+        try {
+          val field = schema(feature)
+          if (!field.dataType.isInstanceOf[NumericType]) {
+            throw new SparkException(s"Data type for column ${feature} is 
${field.dataType}" +
+              s", but a subclass of ${NumericType} is required.")
+          }
+        } catch {
+          case e: IllegalArgumentException =>
+            throw new SparkException(s"No column named ${feature} found on 
dataset.")
+        }
+      }
+    }
+    schema
+  }
+
+}
+
+/**
+ * Target Encoding maps a column of categorical indices into a numerical 
feature derived
+ * from the target.
+ *
+ * When `handleInvalid` is configured to 'keep', previously unseen values of a 
feature
+ * are mapped to the dataset overall statistics.
+ *
+ * When 'targetType' is configured to 'binary', categories are encoded as the 
conditional
+ * probability of the target given that category (bin counting).
+ * When 'targetType' is configured to 'continuous', categories are encoded as 
the average
+ * of the target given that category (mean encoding)
+ *
+ * Parameter 'smoothing' controls how in-category stats and overall stats are 
weighted.
+ *
+ * @note When encoding multi-column by using `inputCols` and `outputCols` 
params, input/output cols
+ * come in pairs, specified by the order in the arrays, and each pair is 
treated independently.
+ *
+ * @see `StringIndexer` for converting categorical values into category indices
+ */
+@Since("4.0.0")
+class TargetEncoder @Since("4.0.0") (@Since("4.0.0") override val uid: String)
+  extends Estimator[TargetEncoderModel] with TargetEncoderBase with 
DefaultParamsWritable {
+
+  @Since("4.0.0")
+  def this() = this(Identifiable.randomUID("TargetEncoder"))
+
+  /** @group setParam */
+  @Since("4.0.0")
+  def setLabelCol(value: String): this.type = set(labelCol, value)
+
+  /** @group setParam */
+  @Since("4.0.0")
+  def setInputCol(value: String): this.type = set(inputCol, value)
+
+  /** @group setParam */
+  @Since("4.0.0")
+  def setOutputCol(value: String): this.type = set(outputCol, value)
+
+  /** @group setParam */
+  @Since("4.0.0")
+  def setInputCols(values: Array[String]): this.type = set(inputCols, values)
+
+  /** @group setParam */
+  @Since("4.0.0")
+  def setOutputCols(values: Array[String]): this.type = set(outputCols, values)
+
+  /** @group setParam */
+  @Since("4.0.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
+
+  /** @group setParam */
+  @Since("4.0.0")
+  def setTargetType(value: String): this.type = set(targetType, value)
+
+  /** @group setParam */
+  @Since("4.0.0")
+  def setSmoothing(value: Double): this.type = set(smoothing, value)
+
+  @Since("4.0.0")
+  override def transformSchema(schema: StructType): StructType = {
+    validateSchema(schema, fitting = true)
+  }
+
+  @Since("4.0.0")
+  override def fit(dataset: Dataset[_]): TargetEncoderModel = {
+    validateSchema(dataset.schema, fitting = true)
+
+    val feature_types = inputFeatures.map{
+      feature => dataset.schema(feature).dataType
+    }
+    val label_type = dataset.schema($(labelCol)).dataType
+
+    val stats = dataset
+      .select((inputFeatures :+ $(labelCol)).map(col).toIndexedSeq: _*)
+      .rdd.treeAggregate(
+        Array.fill(inputFeatures.length) {
+          Map.empty[Option[Double], (Double, Double)]
+        })(
+        (agg, row: Row) => if (!row.isNullAt(inputFeatures.length)) {
+          val label = label_type match {
+            case ByteType => row.getByte(inputFeatures.length).toDouble
+            case ShortType => row.getShort(inputFeatures.length).toDouble
+            case IntegerType => row.getInt(inputFeatures.length).toDouble
+            case LongType => row.getLong(inputFeatures.length).toDouble
+            case DoubleType => row.getDouble(inputFeatures.length)
+          }
+          inputFeatures.indices.map {
+            feature => {
+              val category: Option[Double] = {
+                if (row.isNullAt(feature)) None // null category
+                else {
+                  val value: Double = feature_types(feature) match {
+                    case ByteType => row.getByte(feature).toDouble
+                    case ShortType => row.getShort(feature).toDouble
+                    case IntegerType => row.getInt(feature).toDouble
+                    case LongType => row.getLong(feature).toDouble
+                    case DoubleType => row.getDouble(feature)
+                  }
+                  if (value < 0.0 || value != value.toInt) throw new 
SparkException(
+                    s"Values from column ${inputFeatures(feature)} must be 
indices, " +
+                      s"but got $value.")
+                  else Some(value)
+                }
+              }
+              val (class_count, class_stat) = agg(feature).getOrElse(category, 
(0.0, 0.0))
+              val (global_count, global_stat) =
+                agg(feature).getOrElse(TargetEncoder.UNSEEN_CATEGORY, (0.0, 
0.0))
+              $(targetType) match {
+                case TargetEncoder.TARGET_BINARY => // counting
+                  if (label == 1.0) {
+                    agg(feature) +
+                      (category -> (1 + class_count, 1 + class_stat)) +
+                      (TargetEncoder.UNSEEN_CATEGORY -> (1 + global_count, 1 + 
global_stat))
+                  } else if (label == 0.0) {
+                    agg(feature) +
+                      (category -> (1 + class_count, class_stat)) +
+                      (TargetEncoder.UNSEEN_CATEGORY -> (1 + global_count, 
global_stat))
+                  } else throw new SparkException(
+                    s"Values from column ${getLabelCol} must be binary (0,1) 
but got $label.")
+                case TargetEncoder.TARGET_CONTINUOUS => // incremental mean
+                  agg(feature) +
+                    (category -> (1 + class_count,
+                      class_stat + ((label - class_stat) / (1 + 
class_count)))) +
+                    (TargetEncoder.UNSEEN_CATEGORY -> (1 + global_count,
+                      global_stat + ((label - global_stat) / (1 + 
global_count))))
+              }
+            }
+          }.toArray
+        } else agg,  // ignore null-labeled observations
+        (agg1, agg2) => inputFeatures.indices.map {
+          feature => {
+            val categories = agg1(feature).keySet ++ agg2(feature).keySet
+            categories.map(category =>
+              category -> {
+                val (counter1, stat1) = agg1(feature).getOrElse(category, 
(0.0, 0.0))
+                val (counter2, stat2) = agg2(feature).getOrElse(category, 
(0.0, 0.0))
+                $(targetType) match {
+                  case TargetEncoder.TARGET_BINARY => (counter1 + counter2, 
stat1 + stat2)
+                  case TargetEncoder.TARGET_CONTINUOUS => (counter1 + counter2,
+                    ((counter1 * stat1) + (counter2 * stat2)) / (counter1 + 
counter2))
+                }
+              }).toMap
+          }
+        }.toArray)
+
+    // encodings: Map[feature, Map[Some(category), encoding]]
+    val encodings: Map[String, Map[Option[Double], Double]] =
+      inputFeatures.zip(stats).map {
+        case (feature, stat) =>
+          val (global_count, global_stat) = 
stat.get(TargetEncoder.UNSEEN_CATEGORY).get
+          feature -> stat.map {
+            case (cat, (class_count, class_stat)) => cat -> {
+              val weight = class_count / (class_count + $(smoothing))
+              $(targetType) match {
+                case TargetEncoder.TARGET_BINARY =>

Review Comment:
   done



##########
docs/ml-features.md:
##########
@@ -855,6 +855,58 @@ for more details on the API.
 
 </div>
 
+## TargetEncoder
+
+Target Encoding maps a column of categorical indices into a numerical feature 
derived from the target.
+Leveraging the relationship between categorical features and the target 
variable, target encoding usually performs better than one-hot encoding (while 
avoiding the need to add extra columns)
+
+`TargetEncoder` can transform multiple columns, returning a single 
target-encoded output column for each input column.
+User can specify input and output column names by setting `inputCol` and 
`outputCol` for single-column use cases, or `inputCols` and `outputCols` for 
multi-column use cases (both arrays required to have the same size)
+
+User can specify the target column name by setting `label`. This column 
contains the ground-truth labels from which encodings will be derived
+
+`TargetEncoder` supports the `handleInvalid` parameter to choose how to handle 
invalid input during transforming data.
+Available options include 'keep' (any invalid inputs are assigned to an extra 
categorical index) and 'error' (throw an error).
+
+`TargetEncoder` supports the `targetType` parameter to choose the label type 
when fitting data, affecting how statistics are calculated.
+Available options include 'binary'  and 'continuous' (mean-encoding).
+When set to 'binary', encodings will be fitted from target conditional 
probabilities (a.k.a bin-counting).
+When set to 'continuous', encodings will be fitted from according to target 
mean (a.k.a. mean-encoding).

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to