zhengruifeng commented on a change in pull request #26124: 
[SPARK-29224][ML]Implement Factorization Machines as a ml-pipeline component 
URL: https://github.com/apache/spark/pull/26124#discussion_r350098224
 
 

 ##########
 File path: 
mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
 ##########
 @@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import scala.util.Random
+
+import breeze.linalg.{axpy => brzAxpy, norm => brzNorm, Vector => BV}
+import breeze.numerics.{sqrt => brzSqrt}
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams}
+import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.regression.FactorizationMachines._
+import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
+import org.apache.spark.mllib.{linalg => OldLinalg}
+import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => 
OldVectors}
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.mllib.optimization.{Gradient, GradientDescent, 
SquaredL2Updater, Updater}
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Params for Factorization Machines
+ */
+private[ml] trait FactorizationMachinesParams
+  extends PredictorParams
+  with HasMaxIter with HasStepSize with HasTol with HasSolver {
+
+  /**
+   * Param for dimensionality of the factors (>= 0)
+   * @group param
+   */
+  @Since("3.0.0")
+  final val factorSize: IntParam = new IntParam(this, "factorSize",
+    "Dimensionality of the factor vectors, " +
+      "which are used to get pairwise interactions between variables",
+    ParamValidators.gt(0))
+
+  /** @group getParam */
+  @Since("3.0.0")
+  final def getFactorSize: Int = $(factorSize)
+
+  /**
+   * Param for whether to fit global bias term
+   * @group param
+   */
+  @Since("3.0.0")
+  final val fitBias: BooleanParam = new BooleanParam(this, "fitBias",
+    "whether to fit global bias term")
+
+  /** @group getParam */
+  @Since("3.0.0")
+  final def getFitBias: Boolean = $(fitBias)
+
+  /**
+   * Param for whether to fit linear term (aka 1-way term)
+   * @group param
+   */
+  @Since("3.0.0")
+  final val fitLinear: BooleanParam = new BooleanParam(this, "fitLinear",
+    "whether to fit linear term (aka 1-way term)")
+
+  /** @group getParam */
+  @Since("3.0.0")
+  final def getFitLinear: Boolean = $(fitLinear)
+
+  /**
+   * Param for L2 regularization parameter (>= 0)
+   * @group param
+   */
+  @Since("3.0.0")
+  final val regParam: DoubleParam = new DoubleParam(this, "regParam",
+    "the magnitude of L2-regularization", ParamValidators.gtEq(0))
+
+  /** @group getParam */
+  @Since("3.0.0")
+  final def getRegParam: Double = $(regParam)
+
+  /**
+   * Param for mini-batch fraction, must be in range (0, 1]
+   * @group param
+   */
+  @Since("3.0.0")
+  final val miniBatchFraction: DoubleParam = new DoubleParam(this, 
"miniBatchFraction",
+    "fraction of the input data set that should be used for one iteration of 
gradient descent",
+    ParamValidators.inRange(0, 1, false, true))
+
+  /** @group getParam */
+  @Since("3.0.0")
+  final def getMiniBatchFraction: Double = $(miniBatchFraction)
+
+  /**
+   * Param for standard deviation of initial coefficients
+   * @group param
+   */
+  @Since("3.0.0")
+  final val initStd: DoubleParam = new DoubleParam(this, "initStd",
+    "standard deviation of initial coefficients", ParamValidators.gt(0))
+
+  /** @group getParam */
+  @Since("3.0.0")
+  final def getInitStd: Double = $(initStd)
+
+  /**
+   * The solver algorithm for optimization.
+   * Supported options: "gd", "adamW".
+   * Default: "adamW"
+   *
+   * @group param
+   */
+  @Since("3.0.0")
+  final override val solver: Param[String] = new Param[String](this, "solver",
+    "The solver algorithm for optimization. Supported options: " +
+      s"${supportedSolvers.mkString(", ")}. (Default adamW)",
+    ParamValidators.inArray[String](supportedSolvers))
+}
+
+private[ml] trait FactorizationMachines extends FactorizationMachinesParams {
+
+  private[ml] def initCoefficients(numFeatures: Int): OldVector = {
+    val initialCoefficients =
+      OldVectors.dense(
+        Array.fill($(factorSize) * numFeatures)(Random.nextGaussian() * 
$(initStd)) ++
+        (if ($(fitLinear)) new Array[Double](numFeatures) else 
Array.emptyDoubleArray) ++
+        (if ($(fitBias)) new Array[Double](1) else Array.emptyDoubleArray))
+    initialCoefficients
+  }
+
+  private[ml] def trainImpl(
+      data: RDD[(Double, OldVector)],
+      numFeatures: Int,
+      loss: String
+    ): Vector = {
+
+    // initialize coefficients
+    val initialCoefficients = initCoefficients(numFeatures)
+    val coefficientsSize = initialCoefficients.size
+
+    // optimize coefficients with gradient descent
+    val gradient = parseLoss(loss, $(factorSize), $(fitBias), $(fitLinear), 
numFeatures)
+
+    val updater = parseSolver($(solver), coefficientsSize)
+
+    val optimizer = new GradientDescent(gradient, updater)
+      .setStepSize($(stepSize))
+      .setNumIterations($(maxIter))
+      .setRegParam($(regParam))
+      .setMiniBatchFraction($(miniBatchFraction))
+      .setConvergenceTol($(tol))
+    val coefficients = optimizer.optimize(data, initialCoefficients)
+    coefficients.asML
+  }
+}
+
+private[ml] object FactorizationMachines {
+
+  /** String name for "gd". */
+  val GD = "gd"
+
+  /** String name for "adamW". */
+  val AdamW = "adamW"
+
+  /** Set of solvers that FactorizationMachines supports. */
+  val supportedSolvers = Array(GD, AdamW)
+
+  /** String name for "logisticLoss". */
+  val LogisticLoss = "logisticLoss"
+
+  /** String name for "squaredError". */
+  val SquaredError = "squaredError"
+
+  /** Set of loss function names that FactorizationMachines supports. */
+  val supportedRegressorLosses = Array(SquaredError)
+  val supportedClassifierLosses = Array(LogisticLoss)
+  val supportedLosses = supportedRegressorLosses ++ supportedClassifierLosses
+
+  def parseSolver(solver: String, coefficientsSize: Int): Updater = {
+    solver match {
+      case GD => new SquaredL2Updater()
+      case AdamW => new AdamWUpdater(coefficientsSize)
+    }
+  }
+
+  def parseLoss(
+      lossFunc: String,
+      factorSize: Int,
+      fitBias: Boolean,
+      fitLinear: Boolean,
+      numFeatures: Int): BaseFactorizationMachinesGradient = {
+    lossFunc match {
+      case LogisticLoss =>
+        new LogisticFactorizationMachinesGradient(factorSize, fitBias, 
fitLinear, numFeatures)
+      case SquaredError =>
+        new MSEFactorizationMachinesGradient(factorSize, fitBias, fitLinear, 
numFeatures)
+      case _ => throw new IllegalArgumentException(s"loss function type 
$lossFunc is invalidation")
+    }
+  }
+
+  def splitCoefficients(
+    coefficients: Vector,
+    numFeatures: Int,
+    factorSize: Int,
+    fitBias: Boolean,
+    fitLinear: Boolean
+  ): (Double, Vector, Matrix) = {
+    val bias = if (fitBias) coefficients(coefficients.size - 1) else 0.0
+    val linear: Vector = if (fitLinear) {
+      new DenseVector(coefficients.toArray.slice(numFeatures * factorSize, 
coefficients.size - 1))
+    } else {
+      Vectors.sparse(numFeatures, Seq.empty)
+    }
+    val factors = new DenseMatrix(numFeatures, factorSize,
+      coefficients.toArray.slice(0, numFeatures * factorSize), true)
+    (bias, linear, factors)
+  }
+
+  def combineCoefficients(
+    bias: Double,
+    linear: Vector,
+    factors: Matrix,
+    fitBias: Boolean,
+    fitLinear: Boolean
+  ): Vector = {
+    val coefficients = factors.toDense.values ++
+      (if (fitLinear) linear.toArray else Array.emptyDoubleArray) ++
+      (if (fitBias) Array(bias) else Array.emptyDoubleArray)
+    new DenseVector(coefficients)
+  }
+}
+
+/**
+ * Params for FMRegressor
+ */
+private[regression] trait FMRegressorParams extends 
FactorizationMachinesParams {
+}
+
+/**
+ * Factorization Machines learning algorithm for regression.
+ * It supports normal gradient descent and AdamW solver.
+ *
+ * The implementation is based upon:
+ * <a href="https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf";>
+ * S. Rendle. "Factorization machines" 2010</a>.
+ *
+ * FM is able to estimate interactions even in problems with huge sparsity
+ * (like advertising and recommendation system).
+ * FM formula is:
+ * {{{
+ *   y = w_0 + \sum\limits^n_{i-1} w_i x_i +
+ *     \sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i 
x_j
+ * }}}
+ * First two terms denote global bias and linear term (as same as linear 
regression),
+ * and last term denotes pairwise interactions term. {{{v_i}}} describes the 
i-th variable
+ * with k factors.
+ *
+ * FM regression model uses MSE loss which can be solved by gradient descent 
method, and
+ * regularization terms like L2 are usually added to the loss function to 
prevent overfitting.
+ */
+@Since("3.0.0")
+class FMRegressor @Since("3.0.0") (
+    @Since("3.0.0") override val uid: String)
+  extends Predictor[Vector, FMRegressor, FMRegressorModel]
+  with FactorizationMachines with FMRegressorParams with DefaultParamsWritable 
with Logging {
+
+  @Since("3.0.0")
+  def this() = this(Identifiable.randomUID("fmr"))
+
+  /**
+   * Set the dimensionality of the factors.
+   * Default is 8.
+   *
+   * @group setParam
+   */
+  @Since("3.0.0")
+  def setFactorSize(value: Int): this.type = set(factorSize, value)
+  setDefault(factorSize -> 8)
+
+  /**
+   * Set whether to fit global bias term.
+   * Default is true.
+   *
+   * @group setParam
+   */
+  @Since("3.0.0")
+  def setFitBias(value: Boolean): this.type = set(fitBias, value)
+  setDefault(fitBias -> true)
+
+  /**
+   * Set whether to fit linear term.
+   * Default is true.
+   *
+   * @group setParam
+   */
+  @Since("3.0.0")
+  def setFitLinear(value: Boolean): this.type = set(fitLinear, value)
+  setDefault(fitLinear -> true)
+
+  /**
+   * Set the L2 regularization parameter.
+   * Default is 0.0.
+   *
+   * @group setParam
+   */
+  @Since("3.0.0")
+  def setRegParam(value: Double): this.type = set(regParam, value)
+  setDefault(regParam -> 0.0)
+
+  /**
+   * Set the mini-batch fraction parameter.
+   * Default is 1.0.
+   *
+   * @group setParam
+   */
+  @Since("3.0.0")
+  def setMiniBatchFraction(value: Double): this.type = set(miniBatchFraction, 
value)
+  setDefault(miniBatchFraction -> 1.0)
+
+  /**
+   * Set the standard deviation of initial coefficients.
+   * Default is 0.01.
+   *
+   * @group setParam
+   */
+  @Since("3.0.0")
+  def setInitStd(value: Double): this.type = set(initStd, value)
+  setDefault(initStd -> 0.01)
+
+  /**
+   * Set the maximum number of iterations.
+   * Default is 100.
+   *
+   * @group setParam
+   */
+  @Since("3.0.0")
+  def setMaxIter(value: Int): this.type = set(maxIter, value)
+  setDefault(maxIter -> 100)
+
+  /**
+   * Set the initial step size for the first step (like learning rate).
+   * Default is 1.0.
+   *
+   * @group setParam
+   */
+  @Since("3.0.0")
+  def setStepSize(value: Double): this.type = set(stepSize, value)
+  setDefault(stepSize -> 1.0)
+
+  /**
+   * Set the convergence tolerance of iterations.
+   * Default is 1E-6.
+   *
+   * @group setParam
+   */
+  @Since("3.0.0")
+  def setTol(value: Double): this.type = set(tol, value)
+  setDefault(tol -> 1E-6)
+
+  /**
+   * Set the solver algorithm used for optimization.
+   * Supported options: "gd", "adamW".
+   * Default: "adamW"
+   *
+   * @group setParam
+   */
+  @Since("3.0.0")
+  def setSolver(value: String): this.type = set(solver, value)
+  setDefault(solver -> AdamW)
+
+  override protected[spark] def train(dataset: Dataset[_]): FMRegressorModel = 
{
+    val handlePersistence = dataset.storageLevel == StorageLevel.NONE
+    train(dataset, handlePersistence)
+  }
+
+  protected[spark] def train(
+      dataset: Dataset[_],
+      handlePersistence: Boolean): FMRegressorModel = instrumented { instr =>
+    val data: RDD[(Double, OldVector)] =
+      dataset.select(col($(labelCol)), col($(featuresCol))).rdd.map {
+        case Row(label: Double, features: Vector) =>
+          (label, features)
+      }
+
+    if (handlePersistence) data.persist(StorageLevel.MEMORY_AND_DISK)
+
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, factorSize, fitBias, fitLinear, regParam,
+      miniBatchFraction, initStd, maxIter, stepSize, tol, solver)
+
+    val numFeatures = data.first()._2.size
+    instr.logNumFeatures(numFeatures)
+
+    val coefficients = trainImpl(data, numFeatures, SquaredError)
+
+    val (bias, linear, factors) = splitCoefficients(
+      coefficients, numFeatures, $(factorSize), $(fitBias), $(fitLinear))
+
+    if (handlePersistence) data.unpersist()
+
+    copyValues(new FMRegressorModel(uid, bias, linear, factors))
+  }
+
+  @Since("3.0.0")
+  override def copy(extra: ParamMap): FMRegressor = defaultCopy(extra)
+}
+
+@Since("3.0.0")
+object FMRegressor extends DefaultParamsReadable[FMRegressor] {
+
+  @Since("3.0.0")
+  override def load(path: String): FMRegressor = super.load(path)
+}
+
+/**
+ * Model produced by [[FMRegressor]].
+ */
+@Since("3.0.0")
+class FMRegressorModel private[regression] (
 
 Review comment:
   `FMRegressorModel` -> `FMRegressionModel`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to