zhengruifeng commented on a change in pull request #28349:
URL: https://github.com/apache/spark/pull/28349#discussion_r415266581
##########
File path:
mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
##########
@@ -103,3 +103,129 @@ private[ml] class HingeAggregator(
}
}
}
+
+
+/**
+ * BlockHingeAggregator computes the gradient and loss for Hinge loss function
as used in
+ * binary classification for instances in sparse or dense vector in an online
fashion.
+ *
+ * Two BlockHingeAggregators can be merged together to have a summary of loss
and gradient of
+ * the corresponding joint dataset.
+ *
+ * NOTE: The feature values are expected to be standardized before computation.
+ *
+ * @param bcCoefficients The coefficients corresponding to the features.
+ * @param fitIntercept Whether to fit an intercept term.
+ */
+private[ml] class BlockHingeAggregator(
+ numFeatures: Int,
+ fitIntercept: Boolean,
+ blockSize: Int)(bcCoefficients: Broadcast[Vector])
+ extends DifferentiableLossAggregator[InstanceBlock, BlockHingeAggregator] {
+
+ private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures +
1 else numFeatures
+ protected override val dim: Int = numFeaturesPlusIntercept
+ @transient private lazy val coefficientsArray = bcCoefficients.value match {
+ case DenseVector(values) => values
+ case _ => throw new IllegalArgumentException(s"coefficients only supports
dense vector" +
+ s" but got type ${bcCoefficients.value.getClass}.")
+ }
+
+ @transient private lazy val linear = {
+ if (fitIntercept) {
+ new DenseVector(coefficientsArray.take(numFeatures))
+ } else {
+ new DenseVector(coefficientsArray)
+ }
+ }
+
+ @transient private lazy val intercept =
+ if (fitIntercept) coefficientsArray(numFeatures) else 0.0
+
+ @transient private lazy val linearGradSumVec =
+ if (fitIntercept) new DenseVector(Array.ofDim[Double](numFeatures)) else
null
+
+ @transient private lazy val auxiliaryVec =
+ new DenseVector(Array.ofDim[Double](blockSize))
+
+ /**
+ * Add a new training instance block to this HingeAggregator, and update the
loss and gradient
+ * of the objective function.
+ *
+ * @param block The InstanceBlock to be added.
+ * @return This HingeAggregator object.
+ */
+ def add(block: InstanceBlock): this.type = {
+ require(block.matrix.isTransposed)
+ require(numFeatures == block.numFeatures, s"Dimensions mismatch when
adding new " +
+ s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
+ require(block.weightIter.forall(_ >= 0),
+ s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be
>= 0.0")
+
+ if (block.weightIter.forall(_ == 0)) return this
+ val size = block.size
+ val localGradientSumArray = gradientSumArray
+
+ // vec here represents dotProducts
+ val vec = if (size == blockSize) {
+ auxiliaryVec
+ } else {
+ // the last block within one partition may be of size less than blockSize
+ new DenseVector(Array.ofDim[Double](size))
+ }
+
+ if (fitIntercept) {
+ var i = 0
+ while (i < size) { vec.values(i) = intercept; i += 1 }
+ BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
+ } else {
+ BLAS.gemv(1.0, block.matrix, linear, 0.0, vec)
+ }
+
+ // in-place convert dotProducts to gradient scales
+ // then, vec represents gradient scales
+ var i = 0
+ while (i < size) {
+ val weight = block.getWeight(i)
+ if (weight > 0) {
+ weightSum += weight
+ // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1)
(f_w(x)))
+ // Therefore the gradient is -(2y - 1)*x
+ val label = block.getLabel(i)
+ val labelScaled = 2 * label - 1.0
+ val loss = (1.0 - labelScaled * vec(i)) * weight
+ if (loss > 0) {
+ lossSum += loss
+ val gradScale = -labelScaled * weight
+ vec.values(i) = gradScale
+ } else {
+ vec.values(i) = 0.0
+ }
+ } else {
+ vec.values(i) = 0.0
+ }
+ i += 1
+ }
+
+ // predictions are all correct, no gradient signal
+ if (vec.values.forall(_ == 0)) return this
+
+ block.matrix match {
+ case dm: DenseMatrix =>
+ BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values,
dm.numCols,
+ vec.values, 1, 1.0, localGradientSumArray, 1)
+ if (fitIntercept) localGradientSumArray(numFeatures) += vec.values.sum
+
+ case sm: SparseMatrix if fitIntercept =>
+ BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
Review comment:
No avaiable method to update the first `numFeatures`, so need a temp
output vector `linearGradSumVec`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]