[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14335 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/14335#discussion_r72014278 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) -} +}.persist(StorageLevel.MEMORY_AND_DISK) val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( _ += _, _ += _) -expElogbetaBc.unpersist() val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) --- End diff -- En...DenseVector.toDenseMatrix need to copy the whole buffer in the Vector, maybe there is some influence to performance if they are all done in driver side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14335#discussion_r72013723 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) -} +}.persist(StorageLevel.MEMORY_AND_DISK) val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( _ += _, _ += _) -expElogbetaBc.unpersist() val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) --- End diff -- Let's leave it if in doubt. I figured it's better to do work in parallel if possible, but the downside is unclear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/14335#discussion_r72013619 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) -} +}.persist(StorageLevel.MEMORY_AND_DISK) val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( _ += _, _ += _) -expElogbetaBc.unpersist() val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) --- End diff -- `stats.map(_._2).flatMap(list => list).map(_.toDenseMatrix).collect()` it can also work well. but I think the two ways have no big difference considering efficiency. because `stats.map(_._2).flatMap(list => list)` already generate a RDD[DenseVector] Does serialzing a "DenseVector" or a "one row DenseMatrix" have a big difference ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14335#discussion_r72011858 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) -} +}.persist(StorageLevel.MEMORY_AND_DISK) --- End diff -- Oh, I mean, it's not always clear that it's OK to use the memory/disk, but, I think it's justified here. Yes it will serialize. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14335#discussion_r72011821 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) -} +}.persist(StorageLevel.MEMORY_AND_DISK) val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( _ += _, _ += _) -expElogbetaBc.unpersist() val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) --- End diff -- Yes, I'm wondering why not do this in a distributed way; am I missing why it's only done serially on the driver? is it that the dense matrix class doesn't serialize or serialize well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/14335#discussion_r72003627 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) -} +}.persist(StorageLevel.MEMORY_AND_DISK) val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( _ += _, _ += _) -expElogbetaBc.unpersist() val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) +stats.unpersist(false) --- End diff -- @srowen yeah, here we'd better make it consistent with others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/14335#discussion_r72003530 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) -} +}.persist(StorageLevel.MEMORY_AND_DISK) val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( _ += _, _ += _) -expElogbetaBc.unpersist() val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) --- End diff -- @srowen you mean change `stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix)` to `stats.map(_._2).flatMap(list => list).map(_.toDenseMatrix).collect()` ? will the latter one running faster ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/14335#discussion_r72003428 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) -} +}.persist(StorageLevel.MEMORY_AND_DISK) --- End diff -- @srowen The type of the RDD to be persisted here is fixed to RDD[(BDM[Double], List[BDV[Double]])] so what's the risk of cannot be persisted ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14335#discussion_r71990265 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) -} +}.persist(StorageLevel.MEMORY_AND_DISK) val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( _ += _, _ += _) -expElogbetaBc.unpersist() val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) +stats.unpersist(false) --- End diff -- I tend to agree with not blocking, though lots of code uses the default of blocking for the RDD's removal. I don't know whether to prefer consistency or what. I'm neutral. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14335#discussion_r71990246 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) -} +}.persist(StorageLevel.MEMORY_AND_DISK) val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( _ += _, _ += _) -expElogbetaBc.unpersist() val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) --- End diff -- `.map(_._1)` above should be `.keys`; `.map(_._2)` can be `.values`; `list => list` can be `identity`. I wonder why this collects and then turns things into a dense matrix; can that be done non-locally? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14335#discussion_r71990232 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) -} +}.persist(StorageLevel.MEMORY_AND_DISK) --- End diff -- It's a little risky to assume this can be persisted, but I think it's good idea unless it can be optimized into one pass --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14335: [SPARK-16697][ML][MLLib] improve LDA submitMiniBa...
GitHub user WeichenXu123 opened a pull request: https://github.com/apache/spark/pull/14335 [SPARK-16697][ML][MLLib] improve LDA submitMiniBatch method to avoid redundant RDD computation ## What changes were proposed in this pull request? In `LDAOptimizer.submitMiniBatch`, do persist on `stats: RDD[(BDM[Double], List[BDV[Double]])]` and also move the place of unpersisting `expElogbetaBc` broadcast variable, to avoid the `expElogbetaBc` broadcast variable to be unpersisted too early, and update previous `expElogbetaBc.unpersist()` into `expElogbetaBc.destroy(false)` ## How was this patch tested? Existing test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/WeichenXu123/spark improve_LDA Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14335.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14335 commit e5ed33b559a04215c784d0d81a1578d3f13d8804 Author: WeichenXuDate: 2016-07-20T16:39:27Z improve_LDA --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org