Github user akopich commented on a diff in the pull request:
https://github.com/apache/spark/pull/19565#discussion_r148507781
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -497,40 +481,46 @@ final class OnlineLDAOptimizer extends LDAOptimizer
with Logging {
(u._1, u._2, u._3 + v._3)
}
- val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]],
nonEmptyDocsN: Long) = stats
- .treeAggregate((BDM.zeros[Double](k, vocabSize),
logphatPartOptionBase(), 0L))(
- elementWiseSum, elementWiseSum
- )
+ val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]],
batchSize: Long) =
+ batch.treeAggregate((BDM.zeros[Double](k, vocabSize),
logphatPartOptionBase(), 0L))({
+ case (acc, (_, termCounts)) =>
+ val stat = BDM.zeros[Double](k, vocabSize)
--- End diff --
Actually, we can fix this w/o falling back to `mapPartition`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]