Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143080481
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                        Some(BDV.zeros[Double](k))
    +                                      } else {
    +                                        None
    +                                      }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
    --- End diff --
    
    Minor: indent.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to