Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/4259#discussion_r29091500
--- Diff:
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -97,3 +193,153 @@ class LinearRegressionModel private[ml] (
m
}
}
+
+private class LeastSquaresAggregator(
+ weights: Vector,
+ labelStd: Double,
+ labelMean: Double,
+ featuresStd: Array[Double],
+ featuresMean: Array[Double]) extends Serializable {
+
+ private var totalCnt: Long = 0
+ private var lossSum = 0.0
+ private var diffSum = 0.0
+
+ private val (effectiveWeightsArray: Array[Double], offset: Double, dim:
Int) = {
+ val weightsArray = weights.toArray.clone()
+ var sum = 0.0
+ var i = 0
+ while (i < weightsArray.length) {
+ if (featuresStd(i) != 0.0) {
+ weightsArray(i) /= featuresStd(i)
+ sum += weightsArray(i) * featuresMean(i)
+ } else {
+ weightsArray(i) = 0.0
+ }
+ i += 1
+ }
+ (weightsArray, -sum, weightsArray.length)
+ }
+ private val effectiveWeightsVector = Vectors.dense(effectiveWeightsArray)
+
+ private val gradientSumArray: Array[Double] = Array.ofDim[Double](dim)
+
+ /**
+ * Add a new training data to this LeastSquaresAggregator, and update
the loss and gradient
+ * of the objective function.
+ *
+ * @param label The label for this data point.
+ * @param data The features for one data point in dense/sparse vector
format to be added
+ * into this aggregator.
+ * @return This LeastSquaresAggregator object.
+ */
+ def add(label: Double, data: Vector): this.type = {
+ require(dim == data.size, s"Dimensions mismatch when adding new
sample." +
+ s" Expecting $dim but got ${data.size}.")
+
+ val diff = dot(data, effectiveWeightsVector) - (label - labelMean) /
labelStd + offset
+
+ if (diff != 0) {
+ val localGradientSumArray = gradientSumArray
+ data.foreachActive { (index, value) =>
+ if (featuresStd(index) != 0.0 && value != 0.0) {
+ localGradientSumArray(index) += diff * value / featuresStd(index)
+ }
+ }
+ lossSum += diff * diff / 2.0
+ diffSum += diff
+ }
+
+ totalCnt += 1
+ this
+ }
+
+ /**
+ * Merge another LeastSquaresAggregator, and update the loss and gradient
+ * of the objective function.
+ * (Note that it's in place merging; as a result, `this` object will be
modified.)
+ *
+ * @param other The other LeastSquaresAggregator to be merged.
+ * @return This LeastSquaresAggregator object.
+ */
+ def merge(other: LeastSquaresAggregator): this.type = {
+ require(dim == other.dim, s"Dimensions mismatch when merging with
another " +
+ s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")
+
+ if (this.totalCnt != 0 && other.totalCnt != 0) {
+ totalCnt += other.totalCnt
+ lossSum += other.lossSum
+ diffSum += other.diffSum
+
+ var i = 0
+ val localThisGradientSumArray = this.gradientSumArray
--- End diff --
This optimization is necessary, since this is not on the critical path. The
following should be suffcient:
~~~scala
if (other.totalCnt != 0) {
totalCnt += other.totalCnt
lossSum += other.diffSum
System.arraycopy(other.gradientSumArray, 0, this.gradientSumArray, 0, dim)
}
~~~
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]