Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/21183#discussion_r188081089 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -473,7 +475,8 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { None } - val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => + val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitionsWithIndex --- End diff -- fix scala style: ``` val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitionsWithIndex { (index, docs) => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) val logphatPartOption = logphatPartOptionBase() var nonEmptyDocCount: Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k, seed + index) stat(::, ids) := stat(::, ids) + sstats logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } Iterator((stat, logphatPartOption, nonEmptyDocCount)) } ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org