ykerzhner commented on a change in pull request #31693:
URL: https://github.com/apache/spark/pull/31693#discussion_r585589350
##########
File path:
mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
##########
@@ -934,31 +934,56 @@ class LogisticRegression @Since("1.2.0") (
instances: RDD[Instance],
actualBlockSizeInMB: Double,
featuresStd: Array[Double],
+ featuresMean: Array[Double],
numClasses: Int,
- initialCoefWithInterceptMatrix: Matrix,
+ initialCoefWithInterceptArray: Array[Double],
regularization: Option[L2Regularization],
optimizer: FirstOrderMinimizer[BDV[Double], DiffFunction[BDV[Double]]])
= {
val numFeatures = featuresStd.length
val bcFeaturesStd = instances.context.broadcast(featuresStd)
+ val bcFeaturesMean = instances.context.broadcast(featuresMean)
- val standardized = instances.mapPartitions { iter =>
+ val scaled = instances.mapPartitions { iter =>
val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 /
std else 0.0 }
val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd,
false, true)
iter.map { case Instance(label, weight, vec) => Instance(label, weight,
func(vec)) }
}
val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong
- val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized,
maxMemUsage)
+ val blocks = InstanceBlock.blokifyWithMaxMemUsage(scaled, maxMemUsage)
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)")
- val getAggregatorFunc = new BlockLogisticAggregator(numFeatures,
numClasses, $(fitIntercept),
- checkMultinomial(numClasses))(_)
+ val multinomial = checkMultinomial(numClasses)
+ val fitWithMean = !multinomial && $(fitIntercept) &&
+ (!isSet(lowerBoundsOnIntercepts) ||
$(lowerBoundsOnIntercepts)(0).isNegInfinity) &&
+ (!isSet(upperBoundsOnIntercepts) ||
$(upperBoundsOnIntercepts)(0).isPosInfinity)
+
+ val costFun = if (multinomial) {
+ // TODO: create a separate BlockMultinomialLogisticAggregator for
clearness
+ val getAggregatorFunc = new BlockLogisticAggregator(numFeatures,
numClasses,
+ $(fitIntercept), true)(_)
+ new RDDLossFunction(blocks, getAggregatorFunc, regularization,
$(aggregationDepth))
+ } else {
+ val getAggregatorFunc = new
BlockBinaryLogisticAggregator(bcFeaturesStd, bcFeaturesMean,
+ $(fitIntercept), fitWithMean)(_)
+ new RDDLossFunction(blocks, getAggregatorFunc, regularization,
$(aggregationDepth))
+ }
+
+ if (fitWithMean) {
+ var i = 0
+ var adapt = 0.0
+ while (i < numFeatures) {
+ if (featuresStd(i) != 0) {
+ adapt += initialCoefWithInterceptArray(i) * featuresMean(i) /
featuresStd(i)
Review comment:
Oops, I wrote the explanation last night, but didnt submit the review
till this morning.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]