srowen commented on code in PR #48347: URL: https://github.com/apache/spark/pull/48347#discussion_r1794186169
########## docs/ml-features.md: ########## @@ -855,6 +855,58 @@ for more details on the API. </div> +## TargetEncoder + +Target Encoding maps a column of categorical indices into a numerical feature derived from the target. +Leveraging the relationship between categorical features and the target variable, target encoding usually performs better than one-hot encoding (while avoiding the need to add extra columns) + +`TargetEncoder` can transform multiple columns, returning a single target-encoded output column for each input column. +User can specify input and output column names by setting `inputCol` and `outputCol` for single-column use cases, or `inputCols` and `outputCols` for multi-column use cases (both arrays required to have the same size) + +User can specify the target column name by setting `label`. This column contains the ground-truth labels from which encodings will be derived + +`TargetEncoder` supports the `handleInvalid` parameter to choose how to handle invalid input during transforming data. +Available options include 'keep' (any invalid inputs are assigned to an extra categorical index) and 'error' (throw an error). + +`TargetEncoder` supports the `targetType` parameter to choose the label type when fitting data, affecting how statistics are calculated. +Available options include 'binary' and 'continuous' (mean-encoding). +When set to 'binary', encodings will be fitted from target conditional probabilities (a.k.a bin-counting). +When set to 'continuous', encodings will be fitted from according to target mean (a.k.a. mean-encoding). Review Comment: I think you can describe this a little bit more somewhere, could be here or at the top - what does target encoding actually do? with a simplistic example of a few rows? Just want to make it immediate clearly in 1 paragraph what this is doing for binary vs continuous targets ########## mllib/src/main/scala/org/apache/spark/ml/feature/TargetEncoder.scala: ########## @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.attribute.NominalAttribute +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +/** Private trait for params and common methods for TargetEncoder and TargetEncoderModel */ +private[ml] trait TargetEncoderBase extends Params with HasLabelCol + with HasInputCol with HasInputCols with HasOutputCol with HasOutputCols with HasHandleInvalid { + + /** + * Param for how to handle invalid data during transform(). + * Options are 'keep' (invalid data presented as an extra categorical feature) or + * 'error' (throw an error). + * Note that this Param is only used during transform; during fitting, invalid data + * will result in an error. + * Default: "error" + * @group param + */ + @Since("4.0.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", + "How to handle invalid data during transform(). " + + "Options are 'keep' (invalid data presented as an extra categorical feature) " + + "or error (throw an error). Note that this Param is only used during transform; " + + "during fitting, invalid data will result in an error.", + ParamValidators.inArray(TargetEncoder.supportedHandleInvalids)) + + setDefault(handleInvalid -> TargetEncoder.ERROR_INVALID) + + @Since("4.0.0") + val targetType: Param[String] = new Param[String](this, "targetType", + "How to handle invalid data during transform(). " + + "Options are 'keep' (invalid data presented as an extra categorical feature) " + + "or error (throw an error). Note that this Param is only used during transform; " + + "during fitting, invalid data will result in an error.", + ParamValidators.inArray(TargetEncoder.supportedTargetTypes)) + + setDefault(targetType -> TargetEncoder.TARGET_BINARY) + + final def getTargetType: String = $(targetType) + + @Since("4.0.0") + val smoothing: DoubleParam = new DoubleParam(this, "smoothing", + "lower bound of the output feature range", + ParamValidators.gtEq(0.0)) + + setDefault(smoothing -> 0.0) + + final def getSmoothing: Double = $(smoothing) + + private[feature] lazy val inputFeatures = if (isSet(inputCol)) Array($(inputCol)) + else if (isSet(inputCols)) $(inputCols) + else Array.empty[String] + + private[feature] lazy val outputFeatures = if (isSet(outputCol)) Array($(outputCol)) + else if (isSet(outputCols)) $(outputCols) + else inputFeatures.map{field: String => s"${field}_indexed"} + + private[feature] def validateSchema(schema: StructType, + fitting: Boolean): StructType = { + + require(inputFeatures.length > 0, + s"At least one input column must be specified.") + + require(inputFeatures.length == outputFeatures.length, + s"The number of input columns ${inputFeatures.length} must be the same as the number of " + + s"output columns ${outputFeatures.length}.") + + val features = if (fitting) inputFeatures :+ $(labelCol) + else inputFeatures + + features.foreach { + feature => { + try { + val field = schema(feature) + if (!field.dataType.isInstanceOf[NumericType]) { + throw new SparkException(s"Data type for column ${feature} is ${field.dataType}" + + s", but a subclass of ${NumericType} is required.") + } + } catch { + case e: IllegalArgumentException => + throw new SparkException(s"No column named ${feature} found on dataset.") + } + } + } + schema + } + +} + +/** + * Target Encoding maps a column of categorical indices into a numerical feature derived + * from the target. + * + * When `handleInvalid` is configured to 'keep', previously unseen values of a feature + * are mapped to the dataset overall statistics. + * + * When 'targetType' is configured to 'binary', categories are encoded as the conditional + * probability of the target given that category (bin counting). + * When 'targetType' is configured to 'continuous', categories are encoded as the average + * of the target given that category (mean encoding) + * + * Parameter 'smoothing' controls how in-category stats and overall stats are weighted. + * + * @note When encoding multi-column by using `inputCols` and `outputCols` params, input/output cols + * come in pairs, specified by the order in the arrays, and each pair is treated independently. + * + * @see `StringIndexer` for converting categorical values into category indices + */ +@Since("4.0.0") +class TargetEncoder @Since("4.0.0") (@Since("4.0.0") override val uid: String) + extends Estimator[TargetEncoderModel] with TargetEncoderBase with DefaultParamsWritable { + + @Since("4.0.0") + def this() = this(Identifiable.randomUID("TargetEncoder")) + + /** @group setParam */ + @Since("4.0.0") + def setLabelCol(value: String): this.type = set(labelCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setOutputCol(value: String): this.type = set(outputCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setInputCols(values: Array[String]): this.type = set(inputCols, values) + + /** @group setParam */ + @Since("4.0.0") + def setOutputCols(values: Array[String]): this.type = set(outputCols, values) + + /** @group setParam */ + @Since("4.0.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** @group setParam */ + @Since("4.0.0") + def setTargetType(value: String): this.type = set(targetType, value) + + /** @group setParam */ + @Since("4.0.0") + def setSmoothing(value: Double): this.type = set(smoothing, value) + + @Since("4.0.0") + override def transformSchema(schema: StructType): StructType = { + validateSchema(schema, fitting = true) + } + + @Since("4.0.0") + override def fit(dataset: Dataset[_]): TargetEncoderModel = { + validateSchema(dataset.schema, fitting = true) + + val feature_types = inputFeatures.map{ + feature => dataset.schema(feature).dataType + } + val label_type = dataset.schema($(labelCol)).dataType + + val stats = dataset + .select((inputFeatures :+ $(labelCol)).map(col).toIndexedSeq: _*) + .rdd.treeAggregate( + Array.fill(inputFeatures.length) { + Map.empty[Option[Double], (Double, Double)] + })( + (agg, row: Row) => if (!row.isNullAt(inputFeatures.length)) { Review Comment: Ah, if the label is null, hm, that could be an error. I don't think it makes sense to have input to any supervised problem where the label is missing. Here you'd be welcome to copy whatever other similar classes do in this situation ########## mllib/src/main/scala/org/apache/spark/ml/feature/TargetEncoder.scala: ########## @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.attribute.NominalAttribute +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +/** Private trait for params and common methods for TargetEncoder and TargetEncoderModel */ +private[ml] trait TargetEncoderBase extends Params with HasLabelCol + with HasInputCol with HasInputCols with HasOutputCol with HasOutputCols with HasHandleInvalid { + + /** + * Param for how to handle invalid data during transform(). + * Options are 'keep' (invalid data presented as an extra categorical feature) or + * 'error' (throw an error). + * Note that this Param is only used during transform; during fitting, invalid data + * will result in an error. + * Default: "error" + * @group param + */ + @Since("4.0.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", + "How to handle invalid data during transform(). " + + "Options are 'keep' (invalid data presented as an extra categorical feature) " + + "or error (throw an error). Note that this Param is only used during transform; " + + "during fitting, invalid data will result in an error.", + ParamValidators.inArray(TargetEncoder.supportedHandleInvalids)) + + setDefault(handleInvalid -> TargetEncoder.ERROR_INVALID) + + @Since("4.0.0") + val targetType: Param[String] = new Param[String](this, "targetType", + "How to handle invalid data during transform(). " + + "Options are 'keep' (invalid data presented as an extra categorical feature) " + + "or error (throw an error). Note that this Param is only used during transform; " + + "during fitting, invalid data will result in an error.", + ParamValidators.inArray(TargetEncoder.supportedTargetTypes)) + + setDefault(targetType -> TargetEncoder.TARGET_BINARY) + + final def getTargetType: String = $(targetType) + + @Since("4.0.0") + val smoothing: DoubleParam = new DoubleParam(this, "smoothing", + "lower bound of the output feature range", + ParamValidators.gtEq(0.0)) + + setDefault(smoothing -> 0.0) + + final def getSmoothing: Double = $(smoothing) + + private[feature] lazy val inputFeatures = if (isSet(inputCol)) Array($(inputCol)) + else if (isSet(inputCols)) $(inputCols) + else Array.empty[String] + + private[feature] lazy val outputFeatures = if (isSet(outputCol)) Array($(outputCol)) + else if (isSet(outputCols)) $(outputCols) + else inputFeatures.map{field: String => s"${field}_indexed"} + + private[feature] def validateSchema(schema: StructType, + fitting: Boolean): StructType = { + + require(inputFeatures.length > 0, + s"At least one input column must be specified.") + + require(inputFeatures.length == outputFeatures.length, + s"The number of input columns ${inputFeatures.length} must be the same as the number of " + + s"output columns ${outputFeatures.length}.") + + val features = if (fitting) inputFeatures :+ $(labelCol) + else inputFeatures + + features.foreach { + feature => { + try { + val field = schema(feature) + if (!field.dataType.isInstanceOf[NumericType]) { + throw new SparkException(s"Data type for column ${feature} is ${field.dataType}" + + s", but a subclass of ${NumericType} is required.") + } + } catch { + case e: IllegalArgumentException => + throw new SparkException(s"No column named ${feature} found on dataset.") + } + } + } + schema + } + +} + +/** + * Target Encoding maps a column of categorical indices into a numerical feature derived + * from the target. + * + * When `handleInvalid` is configured to 'keep', previously unseen values of a feature + * are mapped to the dataset overall statistics. + * + * When 'targetType' is configured to 'binary', categories are encoded as the conditional + * probability of the target given that category (bin counting). + * When 'targetType' is configured to 'continuous', categories are encoded as the average + * of the target given that category (mean encoding) + * + * Parameter 'smoothing' controls how in-category stats and overall stats are weighted. + * + * @note When encoding multi-column by using `inputCols` and `outputCols` params, input/output cols + * come in pairs, specified by the order in the arrays, and each pair is treated independently. + * + * @see `StringIndexer` for converting categorical values into category indices + */ +@Since("4.0.0") +class TargetEncoder @Since("4.0.0") (@Since("4.0.0") override val uid: String) + extends Estimator[TargetEncoderModel] with TargetEncoderBase with DefaultParamsWritable { + + @Since("4.0.0") + def this() = this(Identifiable.randomUID("TargetEncoder")) + + /** @group setParam */ + @Since("4.0.0") + def setLabelCol(value: String): this.type = set(labelCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setOutputCol(value: String): this.type = set(outputCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setInputCols(values: Array[String]): this.type = set(inputCols, values) + + /** @group setParam */ + @Since("4.0.0") + def setOutputCols(values: Array[String]): this.type = set(outputCols, values) + + /** @group setParam */ + @Since("4.0.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** @group setParam */ + @Since("4.0.0") + def setTargetType(value: String): this.type = set(targetType, value) + + /** @group setParam */ + @Since("4.0.0") + def setSmoothing(value: Double): this.type = set(smoothing, value) + + @Since("4.0.0") + override def transformSchema(schema: StructType): StructType = { + validateSchema(schema, fitting = true) + } + + @Since("4.0.0") + override def fit(dataset: Dataset[_]): TargetEncoderModel = { + validateSchema(dataset.schema, fitting = true) + + val feature_types = inputFeatures.map{ + feature => dataset.schema(feature).dataType + } + val label_type = dataset.schema($(labelCol)).dataType + + val stats = dataset + .select((inputFeatures :+ $(labelCol)).map(col).toIndexedSeq: _*) + .rdd.treeAggregate( + Array.fill(inputFeatures.length) { + Map.empty[Option[Double], (Double, Double)] + })( + (agg, row: Row) => if (!row.isNullAt(inputFeatures.length)) { + val label = label_type match { + case ByteType => row.getByte(inputFeatures.length).toDouble + case ShortType => row.getShort(inputFeatures.length).toDouble + case IntegerType => row.getInt(inputFeatures.length).toDouble + case LongType => row.getLong(inputFeatures.length).toDouble + case DoubleType => row.getDouble(inputFeatures.length) + } + inputFeatures.indices.map { + feature => { + val category: Option[Double] = { + if (row.isNullAt(feature)) None // null category Review Comment: Here is doesn't seem like it's ignoring inputs where feature i is null, but reading it as a category None. Maybe I misread the rest? but it seems like it proceeds anyway. I don't think that's crazy but per your earlier comments, thought the intent might be to ignore these ########## mllib/src/main/scala/org/apache/spark/ml/feature/TargetEncoder.scala: ########## @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.attribute.NominalAttribute +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +/** Private trait for params and common methods for TargetEncoder and TargetEncoderModel */ +private[ml] trait TargetEncoderBase extends Params with HasLabelCol + with HasInputCol with HasInputCols with HasOutputCol with HasOutputCols with HasHandleInvalid { + + /** + * Param for how to handle invalid data during transform(). + * Options are 'keep' (invalid data presented as an extra categorical feature) or + * 'error' (throw an error). + * Note that this Param is only used during transform; during fitting, invalid data + * will result in an error. + * Default: "error" + * @group param + */ + @Since("4.0.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", + "How to handle invalid data during transform(). " + + "Options are 'keep' (invalid data presented as an extra categorical feature) " + + "or error (throw an error). Note that this Param is only used during transform; " + + "during fitting, invalid data will result in an error.", + ParamValidators.inArray(TargetEncoder.supportedHandleInvalids)) + + setDefault(handleInvalid -> TargetEncoder.ERROR_INVALID) + + @Since("4.0.0") + val targetType: Param[String] = new Param[String](this, "targetType", + "How to handle invalid data during transform(). " + + "Options are 'keep' (invalid data presented as an extra categorical feature) " + + "or error (throw an error). Note that this Param is only used during transform; " + + "during fitting, invalid data will result in an error.", + ParamValidators.inArray(TargetEncoder.supportedTargetTypes)) + + setDefault(targetType -> TargetEncoder.TARGET_BINARY) + + final def getTargetType: String = $(targetType) + + @Since("4.0.0") + val smoothing: DoubleParam = new DoubleParam(this, "smoothing", + "lower bound of the output feature range", + ParamValidators.gtEq(0.0)) + + setDefault(smoothing -> 0.0) + + final def getSmoothing: Double = $(smoothing) + + private[feature] lazy val inputFeatures = if (isSet(inputCol)) Array($(inputCol)) + else if (isSet(inputCols)) $(inputCols) + else Array.empty[String] + + private[feature] lazy val outputFeatures = if (isSet(outputCol)) Array($(outputCol)) + else if (isSet(outputCols)) $(outputCols) + else inputFeatures.map{field: String => s"${field}_indexed"} + + private[feature] def validateSchema(schema: StructType, + fitting: Boolean): StructType = { + + require(inputFeatures.length > 0, + s"At least one input column must be specified.") + + require(inputFeatures.length == outputFeatures.length, + s"The number of input columns ${inputFeatures.length} must be the same as the number of " + + s"output columns ${outputFeatures.length}.") + + val features = if (fitting) inputFeatures :+ $(labelCol) + else inputFeatures + + features.foreach { + feature => { + try { + val field = schema(feature) + if (!field.dataType.isInstanceOf[NumericType]) { + throw new SparkException(s"Data type for column ${feature} is ${field.dataType}" + + s", but a subclass of ${NumericType} is required.") + } + } catch { + case e: IllegalArgumentException => + throw new SparkException(s"No column named ${feature} found on dataset.") + } + } + } + schema + } + +} + +/** + * Target Encoding maps a column of categorical indices into a numerical feature derived + * from the target. + * + * When `handleInvalid` is configured to 'keep', previously unseen values of a feature + * are mapped to the dataset overall statistics. + * + * When 'targetType' is configured to 'binary', categories are encoded as the conditional + * probability of the target given that category (bin counting). + * When 'targetType' is configured to 'continuous', categories are encoded as the average + * of the target given that category (mean encoding) + * + * Parameter 'smoothing' controls how in-category stats and overall stats are weighted. + * + * @note When encoding multi-column by using `inputCols` and `outputCols` params, input/output cols + * come in pairs, specified by the order in the arrays, and each pair is treated independently. + * + * @see `StringIndexer` for converting categorical values into category indices + */ +@Since("4.0.0") +class TargetEncoder @Since("4.0.0") (@Since("4.0.0") override val uid: String) + extends Estimator[TargetEncoderModel] with TargetEncoderBase with DefaultParamsWritable { + + @Since("4.0.0") + def this() = this(Identifiable.randomUID("TargetEncoder")) + + /** @group setParam */ + @Since("4.0.0") + def setLabelCol(value: String): this.type = set(labelCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setOutputCol(value: String): this.type = set(outputCol, value) + + /** @group setParam */ + @Since("4.0.0") + def setInputCols(values: Array[String]): this.type = set(inputCols, values) + + /** @group setParam */ + @Since("4.0.0") + def setOutputCols(values: Array[String]): this.type = set(outputCols, values) + + /** @group setParam */ + @Since("4.0.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** @group setParam */ + @Since("4.0.0") + def setTargetType(value: String): this.type = set(targetType, value) + + /** @group setParam */ + @Since("4.0.0") + def setSmoothing(value: Double): this.type = set(smoothing, value) + + @Since("4.0.0") + override def transformSchema(schema: StructType): StructType = { + validateSchema(schema, fitting = true) + } + + @Since("4.0.0") + override def fit(dataset: Dataset[_]): TargetEncoderModel = { + validateSchema(dataset.schema, fitting = true) + + val feature_types = inputFeatures.map{ + feature => dataset.schema(feature).dataType + } + val label_type = dataset.schema($(labelCol)).dataType + + val stats = dataset + .select((inputFeatures :+ $(labelCol)).map(col).toIndexedSeq: _*) + .rdd.treeAggregate( + Array.fill(inputFeatures.length) { + Map.empty[Option[Double], (Double, Double)] + })( + (agg, row: Row) => if (!row.isNullAt(inputFeatures.length)) { + val label = label_type match { + case ByteType => row.getByte(inputFeatures.length).toDouble + case ShortType => row.getShort(inputFeatures.length).toDouble + case IntegerType => row.getInt(inputFeatures.length).toDouble + case LongType => row.getLong(inputFeatures.length).toDouble + case DoubleType => row.getDouble(inputFeatures.length) + } + inputFeatures.indices.map { + feature => { + val category: Option[Double] = { + if (row.isNullAt(feature)) None // null category + else { + val value: Double = feature_types(feature) match { + case ByteType => row.getByte(feature).toDouble + case ShortType => row.getShort(feature).toDouble + case IntegerType => row.getInt(feature).toDouble + case LongType => row.getLong(feature).toDouble + case DoubleType => row.getDouble(feature) + } + if (value < 0.0 || value != value.toInt) throw new SparkException( + s"Values from column ${inputFeatures(feature)} must be indices, " + + s"but got $value.") + else Some(value) + } + } + val (class_count, class_stat) = agg(feature).getOrElse(category, (0.0, 0.0)) + val (global_count, global_stat) = + agg(feature).getOrElse(TargetEncoder.UNSEEN_CATEGORY, (0.0, 0.0)) + $(targetType) match { + case TargetEncoder.TARGET_BINARY => // counting + if (label == 1.0) { + agg(feature) + + (category -> (1 + class_count, 1 + class_stat)) + + (TargetEncoder.UNSEEN_CATEGORY -> (1 + global_count, 1 + global_stat)) + } else if (label == 0.0) { + agg(feature) + + (category -> (1 + class_count, class_stat)) + + (TargetEncoder.UNSEEN_CATEGORY -> (1 + global_count, global_stat)) + } else throw new SparkException( + s"Values from column ${getLabelCol} must be binary (0,1) but got $label.") + case TargetEncoder.TARGET_CONTINUOUS => // incremental mean + agg(feature) + + (category -> (1 + class_count, + class_stat + ((label - class_stat) / (1 + class_count)))) + + (TargetEncoder.UNSEEN_CATEGORY -> (1 + global_count, + global_stat + ((label - global_stat) / (1 + global_count)))) + } + } + }.toArray + } else agg, // ignore null-labeled observations + (agg1, agg2) => inputFeatures.indices.map { + feature => { + val categories = agg1(feature).keySet ++ agg2(feature).keySet + categories.map(category => + category -> { + val (counter1, stat1) = agg1(feature).getOrElse(category, (0.0, 0.0)) + val (counter2, stat2) = agg2(feature).getOrElse(category, (0.0, 0.0)) + $(targetType) match { + case TargetEncoder.TARGET_BINARY => (counter1 + counter2, stat1 + stat2) + case TargetEncoder.TARGET_CONTINUOUS => (counter1 + counter2, + ((counter1 * stat1) + (counter2 * stat2)) / (counter1 + counter2)) + } + }).toMap + } + }.toArray) + + // encodings: Map[feature, Map[Some(category), encoding]] + val encodings: Map[String, Map[Option[Double], Double]] = + inputFeatures.zip(stats).map { + case (feature, stat) => + val (global_count, global_stat) = stat.get(TargetEncoder.UNSEEN_CATEGORY).get + feature -> stat.map { + case (cat, (class_count, class_stat)) => cat -> { + val weight = class_count / (class_count + $(smoothing)) + $(targetType) match { + case TargetEncoder.TARGET_BINARY => Review Comment: This all might be worth a few lines of comments explaining the math here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
