WeichenXu123 commented on code in PR #40097: URL: https://github.com/apache/spark/pull/40097#discussion_r1113903042
########## connector/connect/client/jvm/src/main/scala/org/apache/spark/ml/classification/Classifier.scala: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.classification + +import org.apache.spark.annotation.Since +import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams} +import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.param.shared.HasRawPredictionCol +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{DataType, StructType} + +// TODO: revisit it to check whether can put it into mllib-common + +/** + * (private[spark]) Params for classification. + */ +private[spark] trait ClassifierParams extends PredictorParams with HasRawPredictionCol { + + override protected def validateAndTransformSchema( + schema: StructType, + fitting: Boolean, + featuresDataType: DataType): StructType = { + val parentSchema = super.validateAndTransformSchema(schema, fitting, featuresDataType) + SchemaUtils.appendColumn(parentSchema, $(rawPredictionCol), new VectorUDT) + } +} + +/** + * Single-label binary or multiclass classification. Classes are indexed {0, 1, ..., numClasses - + * 1}. + * + * @tparam FeaturesType + * Type of input features. E.g., `Vector` + * @tparam E + * Concrete Estimator type + * @tparam M + * Concrete Model type + */ +abstract class Classifier[ + FeaturesType, + E <: Classifier[FeaturesType, E, M], + M <: ClassificationModel[FeaturesType, M]] + extends Predictor[FeaturesType, E, M] + with ClassifierParams { + + /** @group setParam */ + def setRawPredictionCol(value: String): E = set(rawPredictionCol, value).asInstanceOf[E] + + // TODO: defaultEvaluator (follow-up PR) +} + +/** + * Model produced by a [[Classifier]]. Classes are indexed {0, 1, ..., numClasses - 1}. + * + * @tparam FeaturesType + * Type of input features. E.g., `Vector` + * @tparam M + * Concrete Model type + */ +abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[FeaturesType, M]] + extends PredictionModel[FeaturesType, M] + with ClassifierParams { + + /** @group setParam */ + def setRawPredictionCol(value: String): M = set(rawPredictionCol, value).asInstanceOf[M] + + /** Number of classes (values which the label can take). */ + def numClasses: Int + + override def transformSchema(schema: StructType): StructType = { + var outputSchema = super.transformSchema(schema) + if ($(predictionCol).nonEmpty) { + outputSchema = SchemaUtils.updateNumValues(schema, $(predictionCol), numClasses) + } + if ($(rawPredictionCol).nonEmpty) { + outputSchema = + SchemaUtils.updateAttributeGroupSize(outputSchema, $(rawPredictionCol), numClasses) + } + outputSchema + } + + /** + * Transforms dataset by reading from [[featuresCol]], and appending new columns as specified by + * parameters: + * - predicted labels as [[predictionCol]] of type `Double` + * - raw predictions (confidences) as [[rawPredictionCol]] of type `Vector`. + * + * @param dataset + * input dataset + * @return + * transformed dataset + */ + override def transform(dataset: Dataset[_]): DataFrame = { + val outputSchema = transformSchema(dataset.schema, logging = true) + + // Output selected columns only. + // This is a bit complicated since it tries to avoid repeated computation. + var outputData = dataset + var numColsOutput = 0 + if (getRawPredictionCol != "") { + val predictRawUDF = udf { features: Any => + predictRaw(features.asInstanceOf[FeaturesType]) + } + // TODO: Enable when DataFrame.withColumn support metadata Review Comment: Q: When will spark connect supporting column metadata ? Q2: How many cases in spark ML requires column metadata ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
