srowen commented on a change in pull request #31693:
URL: https://github.com/apache/spark/pull/31693#discussion_r584771011
##########
File path:
mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
##########
@@ -934,31 +934,56 @@ class LogisticRegression @Since("1.2.0") (
instances: RDD[Instance],
actualBlockSizeInMB: Double,
featuresStd: Array[Double],
+ featuresMean: Array[Double],
numClasses: Int,
- initialCoefWithInterceptMatrix: Matrix,
+ initialCoefWithInterceptArray: Array[Double],
regularization: Option[L2Regularization],
optimizer: FirstOrderMinimizer[BDV[Double], DiffFunction[BDV[Double]]])
= {
val numFeatures = featuresStd.length
val bcFeaturesStd = instances.context.broadcast(featuresStd)
+ val bcFeaturesMean = instances.context.broadcast(featuresMean)
- val standardized = instances.mapPartitions { iter =>
+ val scaled = instances.mapPartitions { iter =>
val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 /
std else 0.0 }
val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd,
false, true)
iter.map { case Instance(label, weight, vec) => Instance(label, weight,
func(vec)) }
}
val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong
- val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized,
maxMemUsage)
+ val blocks = InstanceBlock.blokifyWithMaxMemUsage(scaled, maxMemUsage)
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)")
- val getAggregatorFunc = new BlockLogisticAggregator(numFeatures,
numClasses, $(fitIntercept),
- checkMultinomial(numClasses))(_)
+ val multinomial = checkMultinomial(numClasses)
+ val fitWithMean = !multinomial && $(fitIntercept) &&
+ (!isSet(lowerBoundsOnIntercepts) ||
$(lowerBoundsOnIntercepts)(0).isNegInfinity) &&
+ (!isSet(upperBoundsOnIntercepts) ||
$(upperBoundsOnIntercepts)(0).isPosInfinity)
+
+ val costFun = if (multinomial) {
+ // TODO: create a separate BlockMultinomialLogisticAggregator for
clearness
+ val getAggregatorFunc = new BlockLogisticAggregator(numFeatures,
numClasses,
+ $(fitIntercept), true)(_)
+ new RDDLossFunction(blocks, getAggregatorFunc, regularization,
$(aggregationDepth))
+ } else {
+ val getAggregatorFunc = new
BlockBinaryLogisticAggregator(bcFeaturesStd, bcFeaturesMean,
+ $(fitIntercept), fitWithMean)(_)
+ new RDDLossFunction(blocks, getAggregatorFunc, regularization,
$(aggregationDepth))
+ }
+
+ if (fitWithMean) {
+ var i = 0
+ var adapt = 0.0
+ while (i < numFeatures) {
+ if (featuresStd(i) != 0) {
+ adapt += initialCoefWithInterceptArray(i) * featuresMean(i) /
featuresStd(i)
Review comment:
Probably a naive question but why multiply by the mean rather than
subtract? I understand it's to compute an adjustment to the intercept, just
wondering where this comes from.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]