[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18924


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-06 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143159334
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+Some(BDV.zeros[Double](k))
+  } else {
+None
+  }
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
--- End diff --

I see now. Thank you. But seems like the style guide suggests to move both 
of the parameters to the new line. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143112965
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+Some(BDV.zeros[Double](k))
+  } else {
+None
+  }
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
--- End diff --

```
val elementWiseSum = (u: (BDM[Double], Option[BDV[Double]], Long),
v: (BDM[Double], Option[BDV[Double]], Long)) => {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143084875
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,21 +533,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
+   * Uses Newton-Rhapson method.
* @see Section 3.3, Huang: Maximum Likelihood Estimation of Dirichlet 
Distribution Parameters
*  (http://jonathan-huang.org/research/dirichlet/dirichlet.pdf)
+   * @param logphat Expectation of estimated log-posterior distribution of
+   *topics in a document averaged over the batch.
+   * @param nonEmptyDocsN number of non-empty documents
*/
-  private def updateAlpha(gammat: BDM[Double]): Unit = {
+  private def updateAlpha(logphat: BDV[Double], nonEmptyDocsN : Double): 
Unit = {
--- End diff --

The methods will have to cast `nonEmptyDocsN: Int` to `Double`. This way we 
have the conversion implicitly, but the method is private so I don't think it's 
going to hurt.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143084656
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+Some(BDV.zeros[Double](k))
+  } else {
+None
+  }
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
--- End diff --

Do you mean the extra spaces after `u` and `v`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143081342
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+  Some(BDV.zeros[Double](k))
+} else {
+  None
+}
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
 
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN: Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
+
+if (nonEmptyDocsN == 0) {
--- End diff --

I don't think that's the case here. But as long as all the cleanup work is 
done, I would not mind it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143080051
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
--- End diff --

Not an style expert myself. Just what I would use:
```
val logphatPartOptionBase = () => {
  if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143080481
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+Some(BDV.zeros[Double](k))
+  } else {
+None
+  }
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
--- End diff --

Minor: indent.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143080675
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,21 +533,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
+   * Uses Newton-Rhapson method.
* @see Section 3.3, Huang: Maximum Likelihood Estimation of Dirichlet 
Distribution Parameters
*  (http://jonathan-huang.org/research/dirichlet/dirichlet.pdf)
+   * @param logphat Expectation of estimated log-posterior distribution of
+   *topics in a document averaged over the batch.
+   * @param nonEmptyDocsN number of non-empty documents
*/
-  private def updateAlpha(gammat: BDM[Double]): Unit = {
+  private def updateAlpha(logphat: BDV[Double], nonEmptyDocsN : Double): 
Unit = {
--- End diff --

nonEmptyDocsN: Int ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143077626
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
--- End diff --

I see. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143069049
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
--- End diff --

About `logphatPartOptionBase`: tried that, initially and failed. This was 
discussed above with @WeichenXu123. The problem is caused by in-place 
modifications. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143068229
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
--- End diff --

About the comments. Keep it as you wish.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143067455
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
--- End diff --

Maybe define logphatPartOptionBase as Option but not function.
val logphatPartOptionBase = if (optimizeDocConcentration) {
 Some(BDV.zeros[Double](k))


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143066229
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+  Some(BDV.zeros[Double](k))
+} else {
+  None
+}
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
 
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN: Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
+
+if (nonEmptyDocsN == 0) {
--- End diff --

But spark scala style guide says :

"... \ return is preferred: Use `return` as a guard to simplify 
control flow without adding a level of indentation".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143064794
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+  Some(BDV.zeros[Double](k))
+} else {
+  None
+}
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
 
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN: Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
+
+if (nonEmptyDocsN == 0) {
+  logWarning("No non-empty documents were submitted in the batch.")
+  // Therefore, there is no need to update any of the model parameters
+  return this
+}
+
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= nonEmptyDocsN.toDouble)
+logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
+
+expElogbetaBc.destroy(false)
--- End diff --

Great point. Thank you. Moreover, it should be destroyed as soon as there 
is no need in it anymore. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143060674
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+  Some(BDV.zeros[Double](k))
--- End diff --

Thx. Fxd.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143060537
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
--- End diff --

If I don't assign logphatPartOptionBase to a local variable, 
NonSerializableException is generated.

Regarding comments. Isn't it necessary to emphasise that the computation 
happens in the same pass?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143056727
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+  Some(BDV.zeros[Double](k))
--- End diff --

indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143057944
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+  Some(BDV.zeros[Double](k))
+} else {
+  None
+}
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
 
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN: Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
+
+if (nonEmptyDocsN == 0) {
--- End diff --

I would use 

if (nonEmptyDocsN > 0) {
  update ...
} else {
  logWarning...
}
this

Just to avoid multiple exits.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143055573
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
--- End diff --

Thinking again, logphatPartOptionBase should have been evaluated on the 
driver side. I'm not sure assigning optimizeDocConcentration to local variable 
is necessary here.

comments can be simpler: calculate logphat only if optimizeDocConcentration 
is true, to update alpha.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143058244
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+  Some(BDV.zeros[Double](k))
+} else {
+  None
+}
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
 
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN: Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
+
+if (nonEmptyDocsN == 0) {
+  logWarning("No non-empty documents were submitted in the batch.")
+  // Therefore, there is no need to update any of the model parameters
+  return this
+}
+
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= nonEmptyDocsN.toDouble)
+logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
+
+expElogbetaBc.destroy(false)
--- End diff --

expElogbetaBc should always be destroyed. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-05 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r143003890
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +462,54 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+  Some(BDV.zeros[Double](k))
+} else {
+  None
+}
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN: Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= nonEmptyDocsN.toDouble)
--- End diff --

Thanks for the comments, @jkbradley  and @hhbyyh. The check is added. I 
have also added a generation of warning message in case of an "empty" batch. I 
believe, a user should know that a thing like that happened. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142833499
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
--- End diff --

agree.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142833374
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
--- End diff --

Sure, since we're talking about consistency with old LDA. It's fine to keep 
using batchSize here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142831316
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
--- End diff --

this may wait.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142826379
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
--- End diff --

That sounds right to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142826326
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +462,54 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+  Some(BDV.zeros[Double](k))
+} else {
+  None
+}
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+  v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN: Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= nonEmptyDocsN.toDouble)
--- End diff --

Good point about dividing by 0, @hhbyyh .  We should probably just check 
nonEmptyDocsN to see if it's 0, and if it is, skip all of these updates.  
That's related to but actually separate from the follow-up SPARK-22111.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142632240
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
--- End diff --

Thanks for the good point. Do I understand correctly that if a batch 
without any non-empty docs is submitted, the `submitMiniBatch` method shouldn't 
change the state of `LDAOptimizer`? 
cc @WeichenXu123 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142625490
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
+logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
+
+expElogbetaBc.destroy(false)
+
 this
   }
 
   /**
-   * Update lambda based on the batch submitted. batchSize can be 
different for each iteration.
+   * Update lambda based on the batch submitted. nonEmptyDocsN can be 
different for each iteration.
--- End diff --

Thanks. Comment reverted. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142624984
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
--- End diff --

I believe, this will be settled down SPARK-22111.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142624246
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
--- End diff --

Thanks. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142624340
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
--- End diff --

Thanks. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142624093
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
--- End diff --

Thanks. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142622117
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
--- End diff --

Thanks. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-04 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142620788
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
--- End diff --

This line is necessary in order to avoid serialization of `LDASuite` which 
is not serializable. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142571627
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
--- End diff --

indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142572013
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
--- End diff --

Instead of batchSize, I think we may use nonEmptyDocsN directly. @jkbradley 
please double check since it will be a behavior change.
Also please notice that nonEmptyDocsN can be zero, so be careful for the 
divide by 0. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142574222
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
+logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
+
+expElogbetaBc.destroy(false)
+
 this
   }
 
   /**
-   * Update lambda based on the batch submitted. batchSize can be 
different for each iteration.
+   * Update lambda based on the batch submitted. nonEmptyDocsN can be 
different for each iteration.
--- End diff --

comments should be consistent with code. Update code or revert comment 
change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142574453
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
+elementWiseSum, elementWiseSum
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
--- End diff --

Should use nonEmptyDocsN to be consistent with original implementation, 
also avoid divide by 0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142571342
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
--- End diff --

If it's only used once, reassign to local variable is not necessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142571728
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN : Long) = stats
--- End diff --

extra space after nonEmptyDocsN 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142571603
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
--- End diff --

The comment is not that accurate. If `optimizeDocConcentration==false`, 
logphat will not be calculated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-10-03 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r142571685
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount : Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
+ v : (BDM[Double], Option[BDV[Double]], 
Long)) => {
--- End diff --

indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-23 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140630215
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = 
batch.mapPartitions { docs =>
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-22 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140621444
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = 
batch.mapPartitions { docs =>
--- End diff --

Let's use Long for the doc count since it could overflow for large datasets 
and miniBatchFraction


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-21 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140249325
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
--- End diff --

I think it doesn't matter if `seq` do not return left param reference but 
return a new object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-21 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140199136
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
--- End diff --

Please, check out the updated PR. 

I have added `val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = 
batch.mapPartitions {...}`.
Unfortunately, we cannot have the aggregation operation in a purely 
in-place manner now since `Int` is immutable. Shouldn't be a big deal since 
matrices and vectors are still updated in place. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-21 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140193380
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
--- End diff --

But should we have `val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] 
= batch.mapPartitions {...}` stuff? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-21 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140188363
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
--- End diff --

I think it's OK. Let's keep the logic exactly the same with old version.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-21 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140183412
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
--- End diff --

Or another suggestion. Lets, have smth like 
`val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = 
batch.mapPartitions {...}`  where the `Int` stands for the number of non-empty 
elements in a partition.  

What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-21 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140180799
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
--- End diff --

@WeichenXu123, you are right. So should we add `stats.count()` or should we 
rather embed the counting in the aggregation phase so that we avoid the second 
pass?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-20 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140111453
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
--- End diff --

There're a small difference between old `N` and `batchSize`. `N` in old 
version code do not count non-empty docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-20 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140032198
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
--- End diff --

I also rename `N` to `batchSize` which it is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-20 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140031900
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +462,46 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
--- End diff --

OK.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140030582
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +462,46 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
--- End diff --

Add an inline doc note here:
"We calculate logphat in the same pass as other statistics, but we only 
need it if we are optimizing docConcentration."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r139832997
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
--- End diff --

Same with N


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r139832967
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   }
 
   /**
-   * Update alpha based on `gammat`, the inferred topic distributions for 
documents in the
-   * current mini-batch. Uses Newton-Rhapson method.
+   * Update alpha based on `logphat`.
--- End diff --

Document what logphat is


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-18 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r139514402
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val logphatPartOptionBase = if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k)) else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]])] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption))
+}
+
+val elementWiseSumInPlace = (u : (BDM[Double], Option[BDV[Double]]),
+ v : (BDM[Double], Option[BDV[Double]])) 
=> {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  u
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]]) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase))(
+elementWiseSumInPlace, elementWiseSumInPlace
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
+logphatOption.foreach(updateAlpha(_, batchSize))
+
+expElogbetaBc.destroy(false)
+stats.unpersist()
--- End diff --

Do you mean `stats.unpersist()`? Sure, I got rid of it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-18 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r139514301
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val logphatPartOptionBase = if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k)) else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]])] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase
--- End diff --

Great point. Thank you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-18 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r139467949
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val logphatPartOptionBase = if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k)) else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]])] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
-expElogbetaBc.destroy(false)
-val batchResult = statsSum *:* expElogbeta.t
+  Iterator((stat, logphatPartOption))
+}
+
+val elementWiseSumInPlace = (u : (BDM[Double], Option[BDV[Double]]),
+ v : (BDM[Double], Option[BDV[Double]])) 
=> {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  u
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]]) = stats
+  .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase))(
+elementWiseSumInPlace, elementWiseSumInPlace
+  )
 
+val batchResult = statsSum *:* expElogbeta.t
 // Note that this is an optimization to avoid batch.count
-updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
-if (optimizeDocConcentration) updateAlpha(gammat)
+val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+updateLambda(batchResult, batchSize)
+
+logphatOption.foreach(_ /= batchSize.toDouble)
+logphatOption.foreach(updateAlpha(_, batchSize))
+
+expElogbetaBc.destroy(false)
+stats.unpersist()
--- End diff --

This line is useless.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-18 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r139470472
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val logphatPartOptionBase = if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k)) else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]])] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase
--- End diff --

We can not reference outer variable and modify it in `map` function, it 
will generate undefined result.
You need create the `logphatPartOption` object in the `map` function, like:
```
val localOptimizeDocConcentration = optimizeDocConcentration
batch.mapPartitions { docs =>
  ...
  val logphatPartOption = if (localOptimizeDocConcentration) 
Some(BDV.zeros[Double](k)) else None
  ...
```
And note that avoid directly use `optimizeDocConcentration` in `rdd.map` 
function because the var  is class member and will cause the whole class object 
to serialize.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-08-11 Thread akopich
GitHub user akopich opened a pull request:

https://github.com/apache/spark/pull/18924

[SPARK-14371] [MLLIB] OnlineLDAOptimizer should not collect stats for each 
doc in mini-batch to driver

Hi, 

as it was proposed by Joseph K. Bradley, gammat are not collected to the 
driver anymore. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/akopich/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18924


commit f81f1cdcf6de1dafdc79c1801cc2e2f1f803f4cc
Author: Valeriy Avanesov 
Date:   2017-08-11T16:28:38Z

[SPARK-14371] [MLLIB] OnlineLDAOptimizer should not collect stats for each 
doc in mini-batch to driver

gammat are not collected to a local matrix but rather represented as 
RDD[BDV[Double]] and are aggregated in a distributed manner




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org