[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18924 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143159334 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { +Some(BDV.zeros[Double](k)) + } else { +None + } + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { --- End diff -- I see now. Thank you. But seems like the style guide suggests to move both of the parameters to the new line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143112965 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { +Some(BDV.zeros[Double](k)) + } else { +None + } + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { --- End diff -- ``` val elementWiseSum = (u: (BDM[Double], Option[BDV[Double]], Long), v: (BDM[Double], Option[BDV[Double]], Long)) => { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143084875 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,21 +533,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. + * Uses Newton-Rhapson method. * @see Section 3.3, Huang: Maximum Likelihood Estimation of Dirichlet Distribution Parameters * (http://jonathan-huang.org/research/dirichlet/dirichlet.pdf) + * @param logphat Expectation of estimated log-posterior distribution of + *topics in a document averaged over the batch. + * @param nonEmptyDocsN number of non-empty documents */ - private def updateAlpha(gammat: BDM[Double]): Unit = { + private def updateAlpha(logphat: BDV[Double], nonEmptyDocsN : Double): Unit = { --- End diff -- The methods will have to cast `nonEmptyDocsN: Int` to `Double`. This way we have the conversion implicitly, but the method is private so I don't think it's going to hurt. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143084656 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { +Some(BDV.zeros[Double](k)) + } else { +None + } + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { --- End diff -- Do you mean the extra spaces after `u` and `v`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143081342 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) +} else { + None +} + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) + +if (nonEmptyDocsN == 0) { --- End diff -- I don't think that's the case here. But as long as all the cleanup work is done, I would not mind it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143080051 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { --- End diff -- Not an style expert myself. Just what I would use: ``` val logphatPartOptionBase = () => { if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143080481 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { +Some(BDV.zeros[Double](k)) + } else { +None + } + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { --- End diff -- Minor: indent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143080675 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,21 +533,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. + * Uses Newton-Rhapson method. * @see Section 3.3, Huang: Maximum Likelihood Estimation of Dirichlet Distribution Parameters * (http://jonathan-huang.org/research/dirichlet/dirichlet.pdf) + * @param logphat Expectation of estimated log-posterior distribution of + *topics in a document averaged over the batch. + * @param nonEmptyDocsN number of non-empty documents */ - private def updateAlpha(gammat: BDM[Double]): Unit = { + private def updateAlpha(logphat: BDV[Double], nonEmptyDocsN : Double): Unit = { --- End diff -- nonEmptyDocsN: Int ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143077626 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. --- End diff -- I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143069049 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. --- End diff -- About `logphatPartOptionBase`: tried that, initially and failed. This was discussed above with @WeichenXu123. The problem is caused by in-place modifications. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143068229 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. --- End diff -- About the comments. Keep it as you wish. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143067455 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. --- End diff -- Maybe define logphatPartOptionBase as Option but not function. val logphatPartOptionBase = if (optimizeDocConcentration) { Some(BDV.zeros[Double](k)) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143066229 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) +} else { + None +} + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) + +if (nonEmptyDocsN == 0) { --- End diff -- But spark scala style guide says : "... \return is preferred: Use `return` as a guard to simplify control flow without adding a level of indentation". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143064794 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) +} else { + None +} + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) + +if (nonEmptyDocsN == 0) { + logWarning("No non-empty documents were submitted in the batch.") + // Therefore, there is no need to update any of the model parameters + return this +} + +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= nonEmptyDocsN.toDouble) +logphatOption.foreach(updateAlpha(_, nonEmptyDocsN)) + +expElogbetaBc.destroy(false) --- End diff -- Great point. Thank you. Moreover, it should be destroyed as soon as there is no need in it anymore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143060674 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) --- End diff -- Thx. Fxd. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143060537 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. --- End diff -- If I don't assign logphatPartOptionBase to a local variable, NonSerializableException is generated. Regarding comments. Isn't it necessary to emphasise that the computation happens in the same pass? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143056727 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) --- End diff -- indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143057944 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) +} else { + None +} + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) + +if (nonEmptyDocsN == 0) { --- End diff -- I would use if (nonEmptyDocsN > 0) { update ... } else { logWarning... } this Just to avoid multiple exits. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143055573 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. --- End diff -- Thinking again, logphatPartOptionBase should have been evaluated on the driver side. I'm not sure assigning optimizeDocConcentration to local variable is necessary here. comments can be simpler: calculate logphat only if optimizeDocConcentration is true, to update alpha. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143058244 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) +} else { + None +} + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) + +if (nonEmptyDocsN == 0) { + logWarning("No non-empty documents were submitted in the batch.") + // Therefore, there is no need to update any of the model parameters + return this +} + +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= nonEmptyDocsN.toDouble) +logphatOption.foreach(updateAlpha(_, nonEmptyDocsN)) + +expElogbetaBc.destroy(false) --- End diff -- expElogbetaBc should always be destroyed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r143003890 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,54 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) +} else { + None +} + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= nonEmptyDocsN.toDouble) --- End diff -- Thanks for the comments, @jkbradley and @hhbyyh. The check is added. I have also added a generation of warning message in case of an "empty" batch. I believe, a user should know that a thing like that happened. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142833499 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) --- End diff -- agree. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142833374 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt --- End diff -- Sure, since we're talking about consistency with old LDA. It's fine to keep using batchSize here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142831316 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt --- End diff -- this may wait. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142826379 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) --- End diff -- That sounds right to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142826326 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,54 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { + Some(BDV.zeros[Double](k)) +} else { + None +} + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= nonEmptyDocsN.toDouble) --- End diff -- Good point about dividing by 0, @hhbyyh . We should probably just check nonEmptyDocsN to see if it's 0, and if it is, skip all of these updates. That's related to but actually separate from the follow-up SPARK-22111. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142632240 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) --- End diff -- Thanks for the good point. Do I understand correctly that if a batch without any non-empty docs is submitted, the `submitMiniBatch` method shouldn't change the state of `LDAOptimizer`? cc @WeichenXu123 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142625490 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) +logphatOption.foreach(updateAlpha(_, nonEmptyDocsN)) + +expElogbetaBc.destroy(false) + this } /** - * Update lambda based on the batch submitted. batchSize can be different for each iteration. + * Update lambda based on the batch submitted. nonEmptyDocsN can be different for each iteration. --- End diff -- Thanks. Comment reverted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142624984 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt --- End diff -- I believe, this will be settled down SPARK-22111. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142624246 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { --- End diff -- Thanks. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142624340 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats --- End diff -- Thanks. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142624093 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None --- End diff -- Thanks. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142622117 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration --- End diff -- Thanks. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142620788 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration --- End diff -- This line is necessary in order to avoid serialization of `LDASuite` which is not serializable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142571627 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None --- End diff -- indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142572013 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt --- End diff -- Instead of batchSize, I think we may use nonEmptyDocsN directly. @jkbradley please double check since it will be a behavior change. Also please notice that nonEmptyDocsN can be zero, so be careful for the divide by 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142574222 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) +logphatOption.foreach(updateAlpha(_, nonEmptyDocsN)) + +expElogbetaBc.destroy(false) + this } /** - * Update lambda based on the batch submitted. batchSize can be different for each iteration. + * Update lambda based on the batch submitted. nonEmptyDocsN can be different for each iteration. --- End diff -- comments should be consistent with code. Update code or revert comment change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142574453 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) --- End diff -- Should use nonEmptyDocsN to be consistent with original implementation, also avoid divide by 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142571342 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration --- End diff -- If it's only used once, reassign to local variable is not necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142571728 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats --- End diff -- extra space after nonEmptyDocsN --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142571603 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration --- End diff -- The comment is not that accurate. If `optimizeDocConcentration==false`, logphat will not be calculated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142571685 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { --- End diff -- indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140630215 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions { docs => --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140621444 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions { docs => --- End diff -- Let's use Long for the doc count since it could overflow for large datasets and miniBatchFraction --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140249325 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- I think it doesn't matter if `seq` do not return left param reference but return a new object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140199136 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- Please, check out the updated PR. I have added `val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions {...}`. Unfortunately, we cannot have the aggregation operation in a purely in-place manner now since `Int` is immutable. Shouldn't be a big deal since matrices and vectors are still updated in place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140193380 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- But should we have `val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions {...}` stuff? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140188363 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- I think it's OK. Let's keep the logic exactly the same with old version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140183412 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- Or another suggestion. Lets, have smth like `val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions {...}` where the `Int` stands for the number of non-empty elements in a partition. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140180799 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- @WeichenXu123, you are right. So should we add `stats.count()` or should we rather embed the counting in the aggregation phase so that we avoid the second pass? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140111453 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- There're a small difference between old `N` and `batchSize`. `N` in old version code do not count non-empty docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140032198 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- I also rename `N` to `batchSize` which it is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140031900 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,46 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) --- End diff -- OK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140030582 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,46 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) --- End diff -- Add an inline doc note here: "We calculate logphat in the same pass as other statistics, but we only need it if we are optimizing docConcentration." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r139832997 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- Same with N --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r139832967 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- Document what logphat is --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r139514402 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase nonEmptyDocs.foreach { case (_, termCounts: Vector) => val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption)) +} + +val elementWiseSumInPlace = (u : (BDM[Double], Option[BDV[Double]]), + v : (BDM[Double], Option[BDV[Double]])) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + u +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]]) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase))( +elementWiseSumInPlace, elementWiseSumInPlace + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) +logphatOption.foreach(updateAlpha(_, batchSize)) + +expElogbetaBc.destroy(false) +stats.unpersist() --- End diff -- Do you mean `stats.unpersist()`? Sure, I got rid of it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r139514301 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase --- End diff -- Great point. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r139467949 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase nonEmptyDocs.foreach { case (_, termCounts: Vector) => val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption)) +} + +val elementWiseSumInPlace = (u : (BDM[Double], Option[BDV[Double]]), + v : (BDM[Double], Option[BDV[Double]])) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + u +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]]) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase))( +elementWiseSumInPlace, elementWiseSumInPlace + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) +logphatOption.foreach(updateAlpha(_, batchSize)) + +expElogbetaBc.destroy(false) +stats.unpersist() --- End diff -- This line is useless. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r139470472 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase --- End diff -- We can not reference outer variable and modify it in `map` function, it will generate undefined result. You need create the `logphatPartOption` object in the `map` function, like: ``` val localOptimizeDocConcentration = optimizeDocConcentration batch.mapPartitions { docs => ... val logphatPartOption = if (localOptimizeDocConcentration) Some(BDV.zeros[Double](k)) else None ... ``` And note that avoid directly use `optimizeDocConcentration` in `rdd.map` function because the var is class member and will cause the whole class object to serialize. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
GitHub user akopich opened a pull request: https://github.com/apache/spark/pull/18924 [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not collect stats for each doc in mini-batch to driver Hi, as it was proposed by Joseph K. Bradley, gammat are not collected to the driver anymore. You can merge this pull request into a Git repository by running: $ git pull https://github.com/akopich/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18924.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18924 commit f81f1cdcf6de1dafdc79c1801cc2e2f1f803f4cc Author: Valeriy AvanesovDate: 2017-08-11T16:28:38Z [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not collect stats for each doc in mini-batch to driver gammat are not collected to a local matrix but rather represented as RDD[BDV[Double]] and are aggregated in a distributed manner --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org