Repository: spark Updated Branches: refs/heads/master 8a5a58506 -> 16d8472f7
[SPARK-19746][ML] Faster indexing for logistic aggregator ## What changes were proposed in this pull request? JIRA: [SPARK-19746](https://issues.apache.org/jira/browse/SPARK-19746) The following code is inefficient: ````scala val localCoefficients: Vector = bcCoefficients.value features.foreachActive { (index, value) => val stdValue = value / localFeaturesStd(index) var j = 0 while (j < numClasses) { margins(j) += localCoefficients(index * numClasses + j) * stdValue j += 1 } } ```` `localCoefficients(index * numClasses + j)` calls `Vector.apply` which creates a new Breeze vector and indexes that. Even if it is not that slow to create the object, we will generate a lot of extra garbage that may result in longer GC pauses. This is a hot inner loop, so we should optimize wherever possible. ## How was this patch tested? I don't think there's a great way to test this patch. It's purely performance related, so unit tests should guarantee that we haven't made any unwanted changes. Empirically I observed between 10-40% speedups just running short local tests. I suspect the big differences will be seen when large data/coefficient sizes have to pause for GC more often. I welcome other ideas for testing. Author: sethah <seth.hendrickso...@gmail.com> Closes #17078 from sethah/logistic_agg_indexing. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16d8472f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16d8472f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16d8472f Branch: refs/heads/master Commit: 16d8472f74313b0d1932b69a9807fe0f2ad7057c Parents: 8a5a585 Author: sethah <seth.hendrickso...@gmail.com> Authored: Tue Feb 28 00:34:38 2017 +0000 Committer: DB Tsai <dbt...@dbtsai.com> Committed: Tue Feb 28 00:34:38 2017 +0000 ---------------------------------------------------------------------- .../ml/classification/LogisticRegression.scala | 11 ++++++--- .../LogisticRegressionSuite.scala | 26 ++++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/16d8472f/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 892e00f..738b351 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -1431,7 +1431,12 @@ private class LogisticAggregator( private var weightSum = 0.0 private var lossSum = 0.0 - private val gradientSumArray = Array.fill[Double](coefficientSize)(0.0D) + @transient private lazy val coefficientsArray: Array[Double] = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " + + s"got type ${bcCoefficients.value.getClass}.)") + } + private val gradientSumArray = new Array[Double](coefficientSize) if (multinomial && numClasses <= 2) { logInfo(s"Multinomial logistic regression for binary classification yields separate " + @@ -1447,7 +1452,7 @@ private class LogisticAggregator( label: Double): Unit = { val localFeaturesStd = bcFeaturesStd.value - val localCoefficients = bcCoefficients.value + val localCoefficients = coefficientsArray val localGradientArray = gradientSumArray val margin = - { var sum = 0.0 @@ -1491,7 +1496,7 @@ private class LogisticAggregator( logistic regression without pivoting. */ val localFeaturesStd = bcFeaturesStd.value - val localCoefficients = bcCoefficients.value + val localCoefficients = coefficientsArray val localGradientArray = gradientSumArray // marginOfLabel is margins(label) in the formula http://git-wip-us.apache.org/repos/asf/spark/blob/16d8472f/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 43547a4..d89a958 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -456,6 +456,32 @@ class LogisticRegressionSuite assert(blrModel.intercept !== 0.0) } + test("sparse coefficients in LogisticAggregator") { + val bcCoefficientsBinary = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) + val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) + val binaryAgg = new LogisticAggregator(bcCoefficientsBinary, bcFeaturesStd, 2, + fitIntercept = true, multinomial = false) + val thrownBinary = withClue("binary logistic aggregator cannot handle sparse coefficients") { + intercept[IllegalArgumentException] { + binaryAgg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) + } + } + assert(thrownBinary.getMessage.contains("coefficients only supports dense")) + + val bcCoefficientsMulti = spark.sparkContext.broadcast(Vectors.sparse(6, Array(0), Array(1.0))) + val multinomialAgg = new LogisticAggregator(bcCoefficientsMulti, bcFeaturesStd, 3, + fitIntercept = true, multinomial = true) + val thrown = withClue("multinomial logistic aggregator cannot handle sparse coefficients") { + intercept[IllegalArgumentException] { + multinomialAgg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) + } + } + assert(thrown.getMessage.contains("coefficients only supports dense")) + bcCoefficientsBinary.destroy(blocking = false) + bcFeaturesStd.destroy(blocking = false) + bcCoefficientsMulti.destroy(blocking = false) + } + test("overflow prediction for multiclass") { val model = new LogisticRegressionModel("mLogReg", Matrices.dense(3, 2, Array(0.0, 0.0, 0.0, 1.0, 2.0, 3.0)), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org