[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/14335


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-25 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14335#discussion_r72014278
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 gammaPart = gammad :: gammaPart
   }
   Iterator((stat, gammaPart))
-}
+}.persist(StorageLevel.MEMORY_AND_DISK)
 val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
   _ += _, _ += _)
-expElogbetaBc.unpersist()
 val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
   stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
--- End diff --

En...DenseVector.toDenseMatrix need to copy the whole buffer in the Vector, 
maybe there is some influence to performance if they are all done in driver 
side. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-25 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14335#discussion_r72013723
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 gammaPart = gammad :: gammaPart
   }
   Iterator((stat, gammaPart))
-}
+}.persist(StorageLevel.MEMORY_AND_DISK)
 val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
   _ += _, _ += _)
-expElogbetaBc.unpersist()
 val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
   stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
--- End diff --

Let's leave it if in doubt. I figured it's better to do work in parallel if 
possible, but the downside is unclear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-25 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14335#discussion_r72013619
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 gammaPart = gammad :: gammaPart
   }
   Iterator((stat, gammaPart))
-}
+}.persist(StorageLevel.MEMORY_AND_DISK)
 val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
   _ += _, _ += _)
-expElogbetaBc.unpersist()
 val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
   stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
--- End diff --

`stats.map(_._2).flatMap(list => list).map(_.toDenseMatrix).collect()` it 
can also work well.
but I think the two ways have no big difference considering efficiency.
because `stats.map(_._2).flatMap(list => list)` already generate a 
RDD[DenseVector]
Does serialzing a "DenseVector" or a "one row DenseMatrix" have a big 
difference ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-24 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14335#discussion_r72011858
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 gammaPart = gammad :: gammaPart
   }
   Iterator((stat, gammaPart))
-}
+}.persist(StorageLevel.MEMORY_AND_DISK)
--- End diff --

Oh, I mean, it's not always clear that it's OK to use the memory/disk, but, 
I think it's justified here. Yes it will serialize.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-24 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14335#discussion_r72011821
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 gammaPart = gammad :: gammaPart
   }
   Iterator((stat, gammaPart))
-}
+}.persist(StorageLevel.MEMORY_AND_DISK)
 val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
   _ += _, _ += _)
-expElogbetaBc.unpersist()
 val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
   stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
--- End diff --

Yes, I'm wondering why not do this in a distributed way; am I missing why 
it's only done serially on the driver? is it that the dense matrix class 
doesn't serialize or serialize well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-24 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14335#discussion_r72003627
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 gammaPart = gammad :: gammaPart
   }
   Iterator((stat, gammaPart))
-}
+}.persist(StorageLevel.MEMORY_AND_DISK)
 val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
   _ += _, _ += _)
-expElogbetaBc.unpersist()
 val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
   stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
+stats.unpersist(false)
--- End diff --

@srowen yeah, here we'd better make it consistent with others. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-24 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14335#discussion_r72003530
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 gammaPart = gammad :: gammaPart
   }
   Iterator((stat, gammaPart))
-}
+}.persist(StorageLevel.MEMORY_AND_DISK)
 val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
   _ += _, _ += _)
-expElogbetaBc.unpersist()
 val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
   stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
--- End diff --

@srowen
you mean change 
`stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix)`
to
`stats.map(_._2).flatMap(list => list).map(_.toDenseMatrix).collect()`
?

will the latter one running faster ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-24 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14335#discussion_r72003428
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 gammaPart = gammad :: gammaPart
   }
   Iterator((stat, gammaPart))
-}
+}.persist(StorageLevel.MEMORY_AND_DISK)
--- End diff --

@srowen The type of the RDD to be persisted here is fixed to 
RDD[(BDM[Double], List[BDV[Double]])] so what's the risk of cannot be persisted 
? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-24 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14335#discussion_r71990265
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 gammaPart = gammad :: gammaPart
   }
   Iterator((stat, gammaPart))
-}
+}.persist(StorageLevel.MEMORY_AND_DISK)
 val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
   _ += _, _ += _)
-expElogbetaBc.unpersist()
 val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
   stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
+stats.unpersist(false)
--- End diff --

I tend to agree with not blocking, though lots of code uses the default of 
blocking for the RDD's removal. I don't know whether to prefer consistency or 
what. I'm neutral.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-24 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14335#discussion_r71990246
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 gammaPart = gammad :: gammaPart
   }
   Iterator((stat, gammaPart))
-}
+}.persist(StorageLevel.MEMORY_AND_DISK)
 val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
   _ += _, _ += _)
-expElogbetaBc.unpersist()
 val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
   stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
--- End diff --

`.map(_._1)` above should be `.keys`; `.map(_._2)` can be `.values`; `list 
=> list` can be `identity`.
I wonder why this collects and then turns things into a dense matrix; can 
that be done non-locally?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-24 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14335#discussion_r71990232
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 gammaPart = gammad :: gammaPart
   }
   Iterator((stat, gammaPart))
-}
+}.persist(StorageLevel.MEMORY_AND_DISK)
--- End diff --

It's a little risky to assume this can be persisted, but I think it's good 
idea unless it can be optimized into one pass


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...

2016-07-24 Thread WeichenXu123
GitHub user WeichenXu123 opened a pull request:

https://github.com/apache/spark/pull/14335

[SPARK-16697][ML][MLLib] improve LDA submitMiniBatch method to avoid 
redundant RDD computation

## What changes were proposed in this pull request?

In `LDAOptimizer.submitMiniBatch`, do persist on `stats: RDD[(BDM[Double], 
List[BDV[Double]])]`
and also move the place of unpersisting `expElogbetaBc` broadcast variable,
to avoid the `expElogbetaBc` broadcast variable to be unpersisted too early,
and update previous `expElogbetaBc.unpersist()` into 
`expElogbetaBc.destroy(false)`

## How was this patch tested?

Existing test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/WeichenXu123/spark improve_LDA

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14335.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14335


commit e5ed33b559a04215c784d0d81a1578d3f13d8804
Author: WeichenXu 
Date:   2016-07-20T16:39:27Z

improve_LDA




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org