[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15413


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-09 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95302720
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,9 +143,93 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
+  test("univariate dense/sparse data with two clusters") {
+val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+val means = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+val covs = Array(Matrices.dense(1, 1, Array(0.86644)), 
Matrices.dense(1, 1, Array(1.1098)))
+val gaussians = means.zip(covs).map { case (mean, cov) =>
+  new MultivariateGaussian(mean, cov)
+}
+val expected = new GaussianMixtureModel("dummy", weights, gaussians)
+
+Seq(denseDataset, sparseDataset).foreach { dataset =>
+  val actual = new GaussianMixture().setK(2).setSeed(seed).fit(dataset)
+  modelEquals(expected, actual)
+}
+  }
+
+  test("check distributed decomposition") {
--- End diff --

Yeah, I also suffer from bad initialization in some of my use cases. So I 
think we should push to commit 
[SPARK-15785](https://issues.apache.org/jira/browse/SPARK-15785) firstly. It's 
more easy to add correctness test after we support initial model. I'll leave 
this as follow up and open 
[SPARK-19144](https://issues.apache.org/jira/browse/SPARK-19144) to track. 
Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-09 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95195793
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,9 +143,93 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
+  test("univariate dense/sparse data with two clusters") {
+val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+val means = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+val covs = Array(Matrices.dense(1, 1, Array(0.86644)), 
Matrices.dense(1, 1, Array(1.1098)))
+val gaussians = means.zip(covs).map { case (mean, cov) =>
+  new MultivariateGaussian(mean, cov)
+}
+val expected = new GaussianMixtureModel("dummy", weights, gaussians)
+
+Seq(denseDataset, sparseDataset).foreach { dataset =>
+  val actual = new GaussianMixture().setK(2).setSeed(seed).fit(dataset)
+  modelEquals(expected, actual)
+}
+  }
+
+  test("check distributed decomposition") {
--- End diff --

I played with this a bit, and wrote a test to generate two very separate 
clusters and run with distributed computation. Then we could check that the 
model learns approximately the correct cluster means. However, I found that the 
algorithm seems incapable of learning even this very contrived example - due to 
the initialization method.

I still think it's a good test to have, but if you feel strongly against it 
then let's leave it. Otherwise it could be a follow up (along with more 
investigation to the initialization method, which does not seem to be 
effective).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-08 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95076032
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -141,4 +242,37 @@ object GaussianMixtureSuite {
 "maxIter" -> 2,
 "tol" -> 0.01
   )
+
+  val denseData = Seq(
+Vectors.dense(-5.1971), Vectors.dense(-2.5359), Vectors.dense(-3.8220),
+Vectors.dense(-5.2211), Vectors.dense(-5.0602), Vectors.dense( 4.7118),
+Vectors.dense( 6.8989), Vectors.dense( 3.4592), Vectors.dense( 4.6322),
+Vectors.dense( 5.7048), Vectors.dense( 4.6567), Vectors.dense( 5.5026),
+Vectors.dense( 4.5605), Vectors.dense( 5.2043), Vectors.dense( 6.2734)
+  )
+
+  val decompositionData: Seq[Vector] = Seq.tabulate(25) { i: Int =>
+Vectors.dense(Array.tabulate(50)(i + _.toDouble))
+  }
+
+  val rData = Seq(
+Vectors.dense(-0.6264538, 0.1836433), Vectors.dense(-0.8356286, 
1.5952808),
+Vectors.dense(0.3295078, -0.8204684), Vectors.dense(0.4874291, 
0.7383247),
+Vectors.dense(0.5757814, -0.3053884), Vectors.dense(1.5117812, 
0.3898432),
+Vectors.dense(-0.6212406, -2.2146999), Vectors.dense(11.1249309, 
9.9550664),
+Vectors.dense(9.9838097, 10.9438362), Vectors.dense(10.8212212, 
10.5939013),
+Vectors.dense(10.9189774, 10.7821363), Vectors.dense(10.0745650, 
8.0106483),
+Vectors.dense(10.6198257, 9.9438713), Vectors.dense(9.8442045, 
8.5292476),
+Vectors.dense(9.5218499, 10.4179416)
+  )
+
+  case class FeatureData(features: Vector)
+
+  def modelEquals(m1: GaussianMixtureModel, m2: GaussianMixtureModel): 
Unit = {
+assert(m1.weights.length === m2.weights.length)
+for (i <- m1.weights.indices) {
+  assert(m1.gaussians(i).mean ~== m2.gaussians(i).mean absTol 1E-3)
--- End diff --

Oops, forgot it, added. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-08 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95075824
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -141,4 +242,37 @@ object GaussianMixtureSuite {
 "maxIter" -> 2,
 "tol" -> 0.01
   )
+
+  val denseData = Seq(
+Vectors.dense(-5.1971), Vectors.dense(-2.5359), Vectors.dense(-3.8220),
+Vectors.dense(-5.2211), Vectors.dense(-5.0602), Vectors.dense( 4.7118),
+Vectors.dense( 6.8989), Vectors.dense( 3.4592), Vectors.dense( 4.6322),
+Vectors.dense( 5.7048), Vectors.dense( 4.6567), Vectors.dense( 5.5026),
+Vectors.dense( 4.5605), Vectors.dense( 5.2043), Vectors.dense( 6.2734)
+  )
+
+  val decompositionData: Seq[Vector] = Seq.tabulate(25) { i: Int =>
+Vectors.dense(Array.tabulate(50)(i + _.toDouble))
+  }
+
+  val rData = Seq(
+Vectors.dense(-0.6264538, 0.1836433), Vectors.dense(-0.8356286, 
1.5952808),
+Vectors.dense(0.3295078, -0.8204684), Vectors.dense(0.4874291, 
0.7383247),
+Vectors.dense(0.5757814, -0.3053884), Vectors.dense(1.5117812, 
0.3898432),
+Vectors.dense(-0.6212406, -2.2146999), Vectors.dense(11.1249309, 
9.9550664),
+Vectors.dense(9.9838097, 10.9438362), Vectors.dense(10.8212212, 
10.5939013),
+Vectors.dense(10.9189774, 10.7821363), Vectors.dense(10.0745650, 
8.0106483),
+Vectors.dense(10.6198257, 9.9438713), Vectors.dense(9.8442045, 
8.5292476),
+Vectors.dense(9.5218499, 10.4179416)
+  )
+
+  case class FeatureData(features: Vector)
+
+  def modelEquals(m1: GaussianMixtureModel, m2: GaussianMixtureModel): 
Unit = {
+assert(m1.weights.length === m2.weights.length)
+for (i <- m1.weights.indices) {
+  assert(m1.gaussians(i).mean ~== m2.gaussians(i).mean absTol 1E-3)
--- End diff --

We have checked ```weights``` equality at L272. The ```weights``` array is 
belong to the model rather than each cluster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-08 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95075758
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,9 +143,93 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
+  test("univariate dense/sparse data with two clusters") {
+val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+val means = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+val covs = Array(Matrices.dense(1, 1, Array(0.86644)), 
Matrices.dense(1, 1, Array(1.1098)))
+val gaussians = means.zip(covs).map { case (mean, cov) =>
+  new MultivariateGaussian(mean, cov)
+}
+val expected = new GaussianMixtureModel("dummy", weights, gaussians)
+
+Seq(denseDataset, sparseDataset).foreach { dataset =>
+  val actual = new GaussianMixture().setK(2).setSeed(seed).fit(dataset)
+  modelEquals(expected, actual)
+}
+  }
+
+  test("check distributed decomposition") {
--- End diff --

This is because the model is big and it's tedious to construct the model in 
advance. In this model, ```gaussians```(the array of 
```MultivariateGaussian```) contains 5 elements and each element contains a 
mean array of length 50 and a covariance matrix of size 50 * 50.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-08 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95075604
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,9 +143,93 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
--- End diff --

Sounds good, updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-07 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95064061
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,9 +143,93 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
+  test("univariate dense/sparse data with two clusters") {
+val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+val means = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+val covs = Array(Matrices.dense(1, 1, Array(0.86644)), 
Matrices.dense(1, 1, Array(1.1098)))
+val gaussians = means.zip(covs).map { case (mean, cov) =>
+  new MultivariateGaussian(mean, cov)
+}
+val expected = new GaussianMixtureModel("dummy", weights, gaussians)
+
+Seq(denseDataset, sparseDataset).foreach { dataset =>
+  val actual = new GaussianMixture().setK(2).setSeed(seed).fit(dataset)
+  modelEquals(expected, actual)
+}
+  }
+
+  test("check distributed decomposition") {
--- End diff --

This test only checks that when we distribute the computation that it 
produces a model, i.e. that it doesn't fail. So, AFAICT we don't have any test 
right now that checks that when we distribute the computation it produces a 
_correct_ model. I think it's a good idea to have that here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-07 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95064361
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -141,4 +242,37 @@ object GaussianMixtureSuite {
 "maxIter" -> 2,
 "tol" -> 0.01
   )
+
+  val denseData = Seq(
+Vectors.dense(-5.1971), Vectors.dense(-2.5359), Vectors.dense(-3.8220),
+Vectors.dense(-5.2211), Vectors.dense(-5.0602), Vectors.dense( 4.7118),
+Vectors.dense( 6.8989), Vectors.dense( 3.4592), Vectors.dense( 4.6322),
+Vectors.dense( 5.7048), Vectors.dense( 4.6567), Vectors.dense( 5.5026),
+Vectors.dense( 4.5605), Vectors.dense( 5.2043), Vectors.dense( 6.2734)
+  )
+
+  val decompositionData: Seq[Vector] = Seq.tabulate(25) { i: Int =>
+Vectors.dense(Array.tabulate(50)(i + _.toDouble))
+  }
+
+  val rData = Seq(
+Vectors.dense(-0.6264538, 0.1836433), Vectors.dense(-0.8356286, 
1.5952808),
+Vectors.dense(0.3295078, -0.8204684), Vectors.dense(0.4874291, 
0.7383247),
+Vectors.dense(0.5757814, -0.3053884), Vectors.dense(1.5117812, 
0.3898432),
+Vectors.dense(-0.6212406, -2.2146999), Vectors.dense(11.1249309, 
9.9550664),
+Vectors.dense(9.9838097, 10.9438362), Vectors.dense(10.8212212, 
10.5939013),
+Vectors.dense(10.9189774, 10.7821363), Vectors.dense(10.0745650, 
8.0106483),
+Vectors.dense(10.6198257, 9.9438713), Vectors.dense(9.8442045, 
8.5292476),
+Vectors.dense(9.5218499, 10.4179416)
+  )
+
+  case class FeatureData(features: Vector)
+
+  def modelEquals(m1: GaussianMixtureModel, m2: GaussianMixtureModel): 
Unit = {
+assert(m1.weights.length === m2.weights.length)
+for (i <- m1.weights.indices) {
+  assert(m1.gaussians(i).mean ~== m2.gaussians(i).mean absTol 1E-3)
--- End diff --

Why not also check the weights here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-07 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95064130
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,9 +143,93 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
--- End diff --

In most of the other test suites in ML we have a test that checks the 
prediction/transform methods. For example, checking that the prediction always 
matches the highest probability, checking that probabilities sum to one. I 
don't see much reason to diverge from that pattern here, what do you think 
@yanboliang?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-07 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95058896
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,9 +143,104 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
+  test("univariate dense data with two clusters") {
+val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+val means = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+val covs = Array(Matrices.dense(1, 1, Array(0.86644)), 
Matrices.dense(1, 1, Array(1.1098)))
+val gaussians = means.zip(covs).map { case (mean, cov) =>
+  new MultivariateGaussian(mean, cov)
+}
+
+val expected = new GaussianMixtureModel("dummy", weights, gaussians)
+val actual = new 
GaussianMixture().setK(2).setSeed(seed).fit(denseDataset)
+modelEquals(expected, actual)
+  }
+
+  test("univariate sparse data with two clusters") {
--- End diff --

Yeah, I moved on to merge them together. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-06 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95039573
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,9 +143,104 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
+  test("univariate dense data with two clusters") {
+val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+val means = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+val covs = Array(Matrices.dense(1, 1, Array(0.86644)), 
Matrices.dense(1, 1, Array(1.1098)))
+val gaussians = means.zip(covs).map { case (mean, cov) =>
+  new MultivariateGaussian(mean, cov)
+}
+
+val expected = new GaussianMixtureModel("dummy", weights, gaussians)
+val actual = new 
GaussianMixture().setK(2).setSeed(seed).fit(denseDataset)
+modelEquals(expected, actual)
+  }
+
+  test("univariate sparse data with two clusters") {
--- End diff --

I'd be in favor of merging them since they are so nearly identical


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-06 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95039050
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
   override def transformSchema(schema: StructType): StructType = {
 validateAndTransformSchema(schema)
   }
+
+  /**
+   * Initialize weights and corresponding gaussian distributions at random.
+   *
+   * We start with uniform weights, a random mean from the data, and 
diagonal covariance matrices
+   * using component variances derived from the samples.
+   *
+   * @param instances The training instances.
+   * @param numClusters The number of clusters.
+   * @param numFeatures The number of features of training instance.
+   * @return The initialized weights and corresponding gaussian 
distributions. Note the
+   * covariance matrix of multivariate gaussian distribution is 
symmetric and
+   * we only save the upper triangular part as a dense vector.
+   */
+  private def initRandom(
+  instances: RDD[Vector],
+  numClusters: Int,
+  numFeatures: Int): (Array[Double], Array[(DenseVector, 
DenseVector)]) = {
+val samples = instances.takeSample(withReplacement = true, numClusters 
* numSamples, $(seed))
+val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters)
+val gaussians: Array[(DenseVector, DenseVector)] = 
Array.tabulate(numClusters) { i =>
+  val slice = samples.view(i * numSamples, (i + 1) * numSamples)
+  val mean = {
+val v = new DenseVector(new Array[Double](numFeatures))
+var i = 0
+while (i < numSamples) {
+  BLAS.axpy(1.0, slice(i), v)
+  i += 1
+}
+BLAS.scal(1.0 / numSamples, v)
+v
+  }
+  /*
+ Construct matrix where diagonal entries are element-wise
+ variance of input vectors (computes biased variance).
+ Since the covariance matrix of multivariate gaussian distribution 
is symmetric,
+ only the upper triangular part of the matrix will be saved as a 
dense vector
+ in order to reduce the shuffled data size.
+   */
+  val cov = {
+val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
+slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
+val diagVec = Vectors.fromBreeze(ss)
+BLAS.scal(1.0 / numSamples, diagVec)
+val covVec = new DenseVector(Array.fill[Double](
+  numFeatures * (numFeatures + 1) / 2)(0.0))
+diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
+  covVec.values(i + i * (i + 1) / 2) = v
+}
+covVec
+  }
+  (mean, cov)
+}
+(weights, gaussians)
+  }
 }
 
 @Since("2.0.0")
 object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * numFeatures > 25 except for when numClusters is very small.
+   *
+   * @param numClusters  Number of clusters
+   * @param numFeatures  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(
+  numClusters: Int,
+  numFeatures: Int): Boolean = {
+((numClusters - 1.0) / numClusters) * numFeatures > 25.0
+  }
+
+  /**
+   * Convert an n * (n + 1) / 2 dimension array representing the upper 
triangular part of a matrix
+   * into an n * n array representing the full symmetric matrix.
+   *
+   * @param n The order of the n by n matrix.
+   * @param triangularValues The upper triangular part of the matrix 
packed in an array
+   * (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangularValues: Array[Double]): Array[Double] = {
+val symmetricValues = new Array[Double](n * n)
+var r = 0
+var i = 0
+while(i < n) {
+  var j = 0
+  while (j <= i) {
+symmetricValues(i * n + j) = triangularValues(r)
+symmetricValues(j * n + i) = triangularValues(r)
+r += 1
+j += 1
+  }
+  i += 1
+}
+symmetricValues
+  }
+
+  /**
+   * Update the weight, mean and covariance of gaussian distribution.
+   *
+   * @param mean The mean of the gaussian distribution.
+  

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-06 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95031946
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,9 +143,104 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
+  test("univariate dense data with two clusters") {
+val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+val means = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+val covs = Array(Matrices.dense(1, 1, Array(0.86644)), 
Matrices.dense(1, 1, Array(1.1098)))
+val gaussians = means.zip(covs).map { case (mean, cov) =>
+  new MultivariateGaussian(mean, cov)
+}
+
+val expected = new GaussianMixtureModel("dummy", weights, gaussians)
+val actual = new 
GaussianMixture().setK(2).setSeed(seed).fit(denseDataset)
+modelEquals(expected, actual)
+  }
+
+  test("univariate sparse data with two clusters") {
--- End diff --

Actually, don't bother.  This looks ready.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-06 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95031349
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
   override def transformSchema(schema: StructType): StructType = {
 validateAndTransformSchema(schema)
   }
+
+  /**
+   * Initialize weights and corresponding gaussian distributions at random.
+   *
+   * We start with uniform weights, a random mean from the data, and 
diagonal covariance matrices
+   * using component variances derived from the samples.
+   *
+   * @param instances The training instances.
+   * @param numClusters The number of clusters.
+   * @param numFeatures The number of features of training instance.
+   * @return The initialized weights and corresponding gaussian 
distributions. Note the
+   * covariance matrix of multivariate gaussian distribution is 
symmetric and
+   * we only save the upper triangular part as a dense vector.
+   */
+  private def initRandom(
+  instances: RDD[Vector],
+  numClusters: Int,
+  numFeatures: Int): (Array[Double], Array[(DenseVector, 
DenseVector)]) = {
+val samples = instances.takeSample(withReplacement = true, numClusters 
* numSamples, $(seed))
+val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters)
+val gaussians: Array[(DenseVector, DenseVector)] = 
Array.tabulate(numClusters) { i =>
+  val slice = samples.view(i * numSamples, (i + 1) * numSamples)
+  val mean = {
+val v = new DenseVector(new Array[Double](numFeatures))
+var i = 0
+while (i < numSamples) {
+  BLAS.axpy(1.0, slice(i), v)
+  i += 1
+}
+BLAS.scal(1.0 / numSamples, v)
+v
+  }
+  /*
+ Construct matrix where diagonal entries are element-wise
+ variance of input vectors (computes biased variance).
+ Since the covariance matrix of multivariate gaussian distribution 
is symmetric,
+ only the upper triangular part of the matrix will be saved as a 
dense vector
+ in order to reduce the shuffled data size.
+   */
+  val cov = {
+val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
+slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
+val diagVec = Vectors.fromBreeze(ss)
+BLAS.scal(1.0 / numSamples, diagVec)
+val covVec = new DenseVector(Array.fill[Double](
+  numFeatures * (numFeatures + 1) / 2)(0.0))
+diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
+  covVec.values(i + i * (i + 1) / 2) = v
+}
+covVec
+  }
+  (mean, cov)
+}
+(weights, gaussians)
+  }
 }
 
 @Since("2.0.0")
 object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * numFeatures > 25 except for when numClusters is very small.
+   *
+   * @param numClusters  Number of clusters
+   * @param numFeatures  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(
+  numClusters: Int,
+  numFeatures: Int): Boolean = {
+((numClusters - 1.0) / numClusters) * numFeatures > 25.0
+  }
+
+  /**
+   * Convert an n * (n + 1) / 2 dimension array representing the upper 
triangular part of a matrix
+   * into an n * n array representing the full symmetric matrix.
+   *
+   * @param n The order of the n by n matrix.
+   * @param triangularValues The upper triangular part of the matrix 
packed in an array
+   * (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangularValues: Array[Double]): Array[Double] = {
+val symmetricValues = new Array[Double](n * n)
+var r = 0
+var i = 0
+while(i < n) {
+  var j = 0
+  while (j <= i) {
+symmetricValues(i * n + j) = triangularValues(r)
+symmetricValues(j * n + i) = triangularValues(r)
+r += 1
+j += 1
+  }
+  i += 1
+}
+symmetricValues
+  }
+
+  /**
+   * Update the weight, mean and covariance of gaussian distribution.
+   *
+   * @param mean The mean of the gaussian distribution.

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-06 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95031388
  
--- Diff: python/pyspark/ml/clustering.py ---
@@ -95,15 +95,10 @@ class GaussianMixture(JavaEstimator, HasFeaturesCol, 
HasPredictionCol, HasMaxIte
 >>> weights = model.weights
 >>> len(weights)
 3
->>> model.gaussiansDF.show()
-+++
-|mean| cov|
-+++
-|[0.8250140229...|0.0056256...|
-|[-0.4777098016092...|0.167969502720916...|
-|[-0.4472625243352...|0.167304119758233...|
-+++
-...
+>>> model.gaussiansDF.select("mean").head()
--- End diff --

Right, that makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-06 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r95031363
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,9 +143,104 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
+  test("univariate dense data with two clusters") {
+val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+val means = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+val covs = Array(Matrices.dense(1, 1, Array(0.86644)), 
Matrices.dense(1, 1, Array(1.1098)))
+val gaussians = means.zip(covs).map { case (mean, cov) =>
+  new MultivariateGaussian(mean, cov)
+}
+
+val expected = new GaussianMixtureModel("dummy", weights, gaussians)
+val actual = new 
GaussianMixture().setK(2).setSeed(seed).fit(denseDataset)
+modelEquals(expected, actual)
+  }
+
+  test("univariate sparse data with two clusters") {
--- End diff --

This and the previous tests are almost the same. How about combining them 
via a helper function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-06 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94921866
  
--- Diff: python/pyspark/ml/clustering.py ---
@@ -95,15 +95,10 @@ class GaussianMixture(JavaEstimator, HasFeaturesCol, 
HasPredictionCol, HasMaxIte
 >>> weights = model.weights
 >>> len(weights)
 3
->>> model.gaussiansDF.show()
-+++
-|mean| cov|
-+++
-|[0.8250140229...|0.0056256...|
-|[-0.4777098016092...|0.167969502720916...|
-|[-0.4472625243352...|0.167304119758233...|
-+++
-...
+>>> model.gaussiansDF.select("mean").head()
--- End diff --

Using fewer digits can not solve the unstable test issue, since the output 
of ```gaussiansDF.show()``` is pure string, we must match them exactly. 
Actually ```gaussiansDF.show()``` only shows part of the whole DataFrame, 
others are omitted, so the original test does not check for all cells of 
```gaussiansDF```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2017-01-06 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94920036
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
   override def transformSchema(schema: StructType): StructType = {
 validateAndTransformSchema(schema)
   }
+
+  /**
+   * Initialize weights and corresponding gaussian distributions at random.
+   *
+   * We start with uniform weights, a random mean from the data, and 
diagonal covariance matrices
+   * using component variances derived from the samples.
+   *
+   * @param instances The training instances.
+   * @param numClusters The number of clusters.
+   * @param numFeatures The number of features of training instance.
+   * @return The initialized weights and corresponding gaussian 
distributions. Note the
+   * covariance matrix of multivariate gaussian distribution is 
symmetric and
+   * we only save the upper triangular part as a dense vector.
+   */
+  private def initRandom(
+  instances: RDD[Vector],
+  numClusters: Int,
+  numFeatures: Int): (Array[Double], Array[(DenseVector, 
DenseVector)]) = {
+val samples = instances.takeSample(withReplacement = true, numClusters 
* numSamples, $(seed))
+val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters)
+val gaussians: Array[(DenseVector, DenseVector)] = 
Array.tabulate(numClusters) { i =>
+  val slice = samples.view(i * numSamples, (i + 1) * numSamples)
+  val mean = {
+val v = new DenseVector(new Array[Double](numFeatures))
+var i = 0
+while (i < numSamples) {
+  BLAS.axpy(1.0, slice(i), v)
+  i += 1
+}
+BLAS.scal(1.0 / numSamples, v)
+v
+  }
+  /*
+ Construct matrix where diagonal entries are element-wise
+ variance of input vectors (computes biased variance).
+ Since the covariance matrix of multivariate gaussian distribution 
is symmetric,
+ only the upper triangular part of the matrix will be saved as a 
dense vector
+ in order to reduce the shuffled data size.
+   */
+  val cov = {
+val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
+slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
+val diagVec = Vectors.fromBreeze(ss)
+BLAS.scal(1.0 / numSamples, diagVec)
+val covVec = new DenseVector(Array.fill[Double](
+  numFeatures * (numFeatures + 1) / 2)(0.0))
+diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
+  covVec.values(i + i * (i + 1) / 2) = v
+}
+covVec
+  }
+  (mean, cov)
+}
+(weights, gaussians)
+  }
 }
 
 @Since("2.0.0")
 object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * numFeatures > 25 except for when numClusters is very small.
+   *
+   * @param numClusters  Number of clusters
+   * @param numFeatures  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(
+  numClusters: Int,
+  numFeatures: Int): Boolean = {
+((numClusters - 1.0) / numClusters) * numFeatures > 25.0
+  }
+
+  /**
+   * Convert an n * (n + 1) / 2 dimension array representing the upper 
triangular part of a matrix
+   * into an n * n array representing the full symmetric matrix.
+   *
+   * @param n The order of the n by n matrix.
+   * @param triangularValues The upper triangular part of the matrix 
packed in an array
+   * (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangularValues: Array[Double]): Array[Double] = {
+val symmetricValues = new Array[Double](n * n)
+var r = 0
+var i = 0
+while(i < n) {
+  var j = 0
+  while (j <= i) {
+symmetricValues(i * n + j) = triangularValues(r)
+symmetricValues(j * n + i) = triangularValues(r)
+r += 1
+j += 1
+  }
+  i += 1
+}
+symmetricValues
+  }
+
+  /**
+   * Update the weight, mean and covariance of gaussian distribution.
+   *
+   * @param mean The mean of the gaussian distribution.
   

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-12-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94082859
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
   override def transformSchema(schema: StructType): StructType = {
 validateAndTransformSchema(schema)
   }
+
+  /**
+   * Initialize weights and corresponding gaussian distributions at random.
+   *
+   * We start with uniform weights, a random mean from the data, and 
diagonal covariance matrices
+   * using component variances derived from the samples.
+   *
+   * @param instances The training instances.
+   * @param numClusters The number of clusters.
+   * @param numFeatures The number of features of training instance.
+   * @return The initialized weights and corresponding gaussian 
distributions. Note the
+   * covariance matrix of multivariate gaussian distribution is 
symmetric and
+   * we only save the upper triangular part as a dense vector.
--- End diff --

Elsewhere too (e.g., in ExpectationAggregator)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-12-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94083864
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
   override def transformSchema(schema: StructType): StructType = {
 validateAndTransformSchema(schema)
   }
+
+  /**
+   * Initialize weights and corresponding gaussian distributions at random.
+   *
+   * We start with uniform weights, a random mean from the data, and 
diagonal covariance matrices
+   * using component variances derived from the samples.
+   *
+   * @param instances The training instances.
+   * @param numClusters The number of clusters.
+   * @param numFeatures The number of features of training instance.
+   * @return The initialized weights and corresponding gaussian 
distributions. Note the
+   * covariance matrix of multivariate gaussian distribution is 
symmetric and
+   * we only save the upper triangular part as a dense vector.
+   */
+  private def initRandom(
+  instances: RDD[Vector],
+  numClusters: Int,
+  numFeatures: Int): (Array[Double], Array[(DenseVector, 
DenseVector)]) = {
+val samples = instances.takeSample(withReplacement = true, numClusters 
* numSamples, $(seed))
+val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters)
+val gaussians: Array[(DenseVector, DenseVector)] = 
Array.tabulate(numClusters) { i =>
+  val slice = samples.view(i * numSamples, (i + 1) * numSamples)
+  val mean = {
+val v = new DenseVector(new Array[Double](numFeatures))
+var i = 0
+while (i < numSamples) {
+  BLAS.axpy(1.0, slice(i), v)
+  i += 1
+}
+BLAS.scal(1.0 / numSamples, v)
+v
+  }
+  /*
+ Construct matrix where diagonal entries are element-wise
+ variance of input vectors (computes biased variance).
+ Since the covariance matrix of multivariate gaussian distribution 
is symmetric,
+ only the upper triangular part of the matrix will be saved as a 
dense vector
+ in order to reduce the shuffled data size.
+   */
+  val cov = {
+val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
+slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
+val diagVec = Vectors.fromBreeze(ss)
+BLAS.scal(1.0 / numSamples, diagVec)
+val covVec = new DenseVector(Array.fill[Double](
+  numFeatures * (numFeatures + 1) / 2)(0.0))
+diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
+  covVec.values(i + i * (i + 1) / 2) = v
+}
+covVec
+  }
+  (mean, cov)
+}
+(weights, gaussians)
+  }
 }
 
 @Since("2.0.0")
 object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * numFeatures > 25 except for when numClusters is very small.
+   *
+   * @param numClusters  Number of clusters
+   * @param numFeatures  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(
+  numClusters: Int,
+  numFeatures: Int): Boolean = {
+((numClusters - 1.0) / numClusters) * numFeatures > 25.0
+  }
+
+  /**
+   * Convert an n * (n + 1) / 2 dimension array representing the upper 
triangular part of a matrix
+   * into an n * n array representing the full symmetric matrix.
+   *
+   * @param n The order of the n by n matrix.
+   * @param triangularValues The upper triangular part of the matrix 
packed in an array
+   * (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangularValues: Array[Double]): Array[Double] = {
+val symmetricValues = new Array[Double](n * n)
+var r = 0
+var i = 0
+while(i < n) {
+  var j = 0
+  while (j <= i) {
+symmetricValues(i * n + j) = triangularValues(r)
+symmetricValues(j * n + i) = triangularValues(r)
+r += 1
+j += 1
+  }
+  i += 1
+}
+symmetricValues
+  }
+
+  /**
+   * Update the weight, mean and covariance of gaussian distribution.
+   *
+   * @param mean The mean of the gaussian distribution.

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-12-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94084799
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -323,27 +326,95 @@ class GaussianMixture @Since("2.0.0") (
   @Since("2.0.0")
   def setSeed(value: Long): this.type = set(seed, value)
 
+  /**
+   * Number of samples per cluster to use when initializing Gaussians.
+   */
+  private val numSamples = 5
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
 transformSchema(dataset.schema, logging = true)
-val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
-  case Row(point: Vector) => OldVectors.fromML(point)
-}
 
-val instr = Instrumentation.create(this, rdd)
+val sc = dataset.sparkSession.sparkContext
+val numClusters = $(k)
+
+val instances: RDD[Vector] = 
dataset.select(col($(featuresCol))).rdd.map {
+  case Row(features: Vector) => features
+}.cache()
+
+// Extract the number of features.
+val numFeatures = instances.first().size
+
+val instr = Instrumentation.create(this, instances)
 instr.logParams(featuresCol, predictionCol, probabilityCol, k, 
maxIter, seed, tol)
 
-val algo = new MLlibGM()
-  .setK($(k))
-  .setMaxIterations($(maxIter))
-  .setSeed($(seed))
-  .setConvergenceTol($(tol))
-val parentModel = algo.run(rdd)
-val gaussians = parentModel.gaussians.map { case g =>
-  new MultivariateGaussian(g.mu.asML, g.sigma.asML)
+val shouldDistributeGaussians = 
GaussianMixture.shouldDistributeGaussians(
+  numClusters, numFeatures)
+
+// TODO: SPARK-15785 Support users supplied initial GMM.
+val (weights, gaussians) = initRandom(instances, numClusters, 
numFeatures)
+
+var logLikelihood = Double.MinValue
+var logLikelihoodPrev = 0.0
+
+var iter = 0
+while (iter < $(maxIter) && math.abs(logLikelihood - 
logLikelihoodPrev) > $(tol)) {
+
+  val bcWeights = instances.sparkContext.broadcast(weights)
+  val bcGaussians = instances.sparkContext.broadcast(gaussians)
+
+  // aggregate the cluster contribution for all sample points
+  val sums = instances.treeAggregate(
+new ExpectationAggregator(numFeatures, bcWeights, bcGaussians))(
+seqOp = (c, v) => (c, v) match {
+  case (aggregator, instance) => aggregator.add(instance)
+},
+combOp = (c1, c2) => (c1, c2) match {
+  case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
+})
+
+  bcWeights.destroy(blocking = false)
+  bcGaussians.destroy(blocking = false)
+
+  /*
+ Create new distributions based on the partial assignments
+ (often referred to as the "M" step in literature)
+   */
+  val sumWeights = sums.weights.sum
+
+  if (shouldDistributeGaussians) {
+val numPartitions = math.min(numClusters, 1024)
+val tuples = Seq.tabulate(numClusters) { i =>
+  (sums.means(i), sums.covs(i), sums.weights(i))
+}
+val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case 
(mean, cov, weight) =>
+  GaussianMixture.updateWeightsAndGaussians(mean, cov, weight, 
sumWeights)
+}.collect().unzip
+Array.copy(ws.toArray, 0, weights, 0, ws.length)
+Array.copy(gs.toArray, 0, gaussians, 0, gs.length)
+  } else {
+var i = 0
+while (i < numClusters) {
+  val (weight, gaussian) = 
GaussianMixture.updateWeightsAndGaussians(
+sums.means(i), sums.covs(i), sums.weights(i), sumWeights)
+  weights(i) = weight
+  gaussians(i) = gaussian
+  i += 1
+}
+  }
+
+  logLikelihoodPrev = logLikelihood   // current becomes previous
+  logLikelihood = sums.logLikelihood  // this is the freshly computed 
log-likelihood
+  iter += 1
 }
-val model = copyValues(new GaussianMixtureModel(uid, 
parentModel.weights, gaussians))
-  .setParent(this)
+
+val gaussianDists = gaussians.map { case (mean, covVec) =>
+  val cov = new DenseMatrix(numFeatures, numFeatures,
+GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values))
+  new MultivariateGaussian(mean, cov)
+}
+
+val model = copyValues(new GaussianMixtureModel(uid, weights, 
gaussianDists)).setParent(this)
--- End diff --

Below here, we call logNumFeatures.  This isn't part of your PR, but could 
you move it earlier since numFeatures is 

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-12-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94084884
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
   override def transformSchema(schema: StructType): StructType = {
 validateAndTransformSchema(schema)
   }
+
+  /**
+   * Initialize weights and corresponding gaussian distributions at random.
+   *
+   * We start with uniform weights, a random mean from the data, and 
diagonal covariance matrices
+   * using component variances derived from the samples.
+   *
+   * @param instances The training instances.
+   * @param numClusters The number of clusters.
+   * @param numFeatures The number of features of training instance.
+   * @return The initialized weights and corresponding gaussian 
distributions. Note the
+   * covariance matrix of multivariate gaussian distribution is 
symmetric and
+   * we only save the upper triangular part as a dense vector.
+   */
+  private def initRandom(
+  instances: RDD[Vector],
+  numClusters: Int,
+  numFeatures: Int): (Array[Double], Array[(DenseVector, 
DenseVector)]) = {
+val samples = instances.takeSample(withReplacement = true, numClusters 
* numSamples, $(seed))
+val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters)
+val gaussians: Array[(DenseVector, DenseVector)] = 
Array.tabulate(numClusters) { i =>
+  val slice = samples.view(i * numSamples, (i + 1) * numSamples)
+  val mean = {
+val v = new DenseVector(new Array[Double](numFeatures))
+var i = 0
+while (i < numSamples) {
+  BLAS.axpy(1.0, slice(i), v)
+  i += 1
+}
+BLAS.scal(1.0 / numSamples, v)
+v
+  }
+  /*
+ Construct matrix where diagonal entries are element-wise
+ variance of input vectors (computes biased variance).
+ Since the covariance matrix of multivariate gaussian distribution 
is symmetric,
+ only the upper triangular part of the matrix will be saved as a 
dense vector
+ in order to reduce the shuffled data size.
+   */
+  val cov = {
+val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
+slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
+val diagVec = Vectors.fromBreeze(ss)
+BLAS.scal(1.0 / numSamples, diagVec)
+val covVec = new DenseVector(Array.fill[Double](
+  numFeatures * (numFeatures + 1) / 2)(0.0))
+diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
+  covVec.values(i + i * (i + 1) / 2) = v
+}
+covVec
+  }
+  (mean, cov)
+}
+(weights, gaussians)
+  }
 }
 
 @Since("2.0.0")
 object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * numFeatures > 25 except for when numClusters is very small.
+   *
+   * @param numClusters  Number of clusters
+   * @param numFeatures  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(
+  numClusters: Int,
+  numFeatures: Int): Boolean = {
+((numClusters - 1.0) / numClusters) * numFeatures > 25.0
+  }
+
+  /**
+   * Convert an n * (n + 1) / 2 dimension array representing the upper 
triangular part of a matrix
+   * into an n * n array representing the full symmetric matrix.
+   *
+   * @param n The order of the n by n matrix.
+   * @param triangularValues The upper triangular part of the matrix 
packed in an array
+   * (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
--- End diff --

You always use this right away by converting it to a DenseMatrix, so how 
about just returning a DenseMatrix?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-12-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94083559
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
   override def transformSchema(schema: StructType): StructType = {
 validateAndTransformSchema(schema)
   }
+
+  /**
+   * Initialize weights and corresponding gaussian distributions at random.
+   *
+   * We start with uniform weights, a random mean from the data, and 
diagonal covariance matrices
+   * using component variances derived from the samples.
+   *
+   * @param instances The training instances.
+   * @param numClusters The number of clusters.
+   * @param numFeatures The number of features of training instance.
+   * @return The initialized weights and corresponding gaussian 
distributions. Note the
+   * covariance matrix of multivariate gaussian distribution is 
symmetric and
+   * we only save the upper triangular part as a dense vector.
+   */
+  private def initRandom(
+  instances: RDD[Vector],
+  numClusters: Int,
+  numFeatures: Int): (Array[Double], Array[(DenseVector, 
DenseVector)]) = {
+val samples = instances.takeSample(withReplacement = true, numClusters 
* numSamples, $(seed))
+val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters)
+val gaussians: Array[(DenseVector, DenseVector)] = 
Array.tabulate(numClusters) { i =>
+  val slice = samples.view(i * numSamples, (i + 1) * numSamples)
+  val mean = {
+val v = new DenseVector(new Array[Double](numFeatures))
+var i = 0
+while (i < numSamples) {
+  BLAS.axpy(1.0, slice(i), v)
+  i += 1
+}
+BLAS.scal(1.0 / numSamples, v)
+v
+  }
+  /*
+ Construct matrix where diagonal entries are element-wise
+ variance of input vectors (computes biased variance).
+ Since the covariance matrix of multivariate gaussian distribution 
is symmetric,
+ only the upper triangular part of the matrix will be saved as a 
dense vector
+ in order to reduce the shuffled data size.
+   */
+  val cov = {
+val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
+slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
+val diagVec = Vectors.fromBreeze(ss)
+BLAS.scal(1.0 / numSamples, diagVec)
+val covVec = new DenseVector(Array.fill[Double](
+  numFeatures * (numFeatures + 1) / 2)(0.0))
+diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
+  covVec.values(i + i * (i + 1) / 2) = v
+}
+covVec
+  }
+  (mean, cov)
+}
+(weights, gaussians)
+  }
 }
 
 @Since("2.0.0")
 object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * numFeatures > 25 except for when numClusters is very small.
+   *
+   * @param numClusters  Number of clusters
+   * @param numFeatures  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(
+  numClusters: Int,
+  numFeatures: Int): Boolean = {
+((numClusters - 1.0) / numClusters) * numFeatures > 25.0
+  }
+
+  /**
+   * Convert an n * (n + 1) / 2 dimension array representing the upper 
triangular part of a matrix
+   * into an n * n array representing the full symmetric matrix.
+   *
+   * @param n The order of the n by n matrix.
+   * @param triangularValues The upper triangular part of the matrix 
packed in an array
+   * (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangularValues: Array[Double]): Array[Double] = {
+val symmetricValues = new Array[Double](n * n)
+var r = 0
+var i = 0
+while(i < n) {
+  var j = 0
+  while (j <= i) {
+symmetricValues(i * n + j) = triangularValues(r)
+symmetricValues(j * n + i) = triangularValues(r)
+r += 1
+j += 1
+  }
+  i += 1
+}
+symmetricValues
+  }
+
+  /**
+   * Update the weight, mean and covariance of gaussian distribution.
+   *
+   * @param mean The mean of the gaussian distribution.

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-12-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94086090
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,6 +141,106 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
+  test("univariate dense data with two clusters") {
+val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+val mean = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+val cov = Array(Matrices.dense(1, 1, Array(0.86644)), 
Matrices.dense(1, 1, Array(1.1098)))
+
+val gmm = new GaussianMixture().setK(2).fit(denseDataset)
+
+assert(gmm.weights(0) ~== weights(0) absTol 1E-3)
+assert(gmm.weights(1) ~== weights(1) absTol 1E-3)
+assert(gmm.gaussians(0).mean ~== mean(0) absTol 1E-3)
+assert(gmm.gaussians(1).mean ~== mean(1) absTol 1E-3)
+assert(gmm.gaussians(0).cov ~== cov(0) absTol 1E-3)
+assert(gmm.gaussians(1).cov ~== cov(1) absTol 1E-3)
+  }
+
+  test("univariate sparse data with two clusters") {
--- End diff --

This and the previous tests are almost the same.  How about combining them 
via a helper function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-12-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94085951
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -126,6 +141,106 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
 testEstimatorAndModelReadWrite(gm, dataset,
   GaussianMixtureSuite.allParamSettings, checkModelData)
   }
+
+  test("univariate dense data with two clusters") {
+val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+val mean = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+val cov = Array(Matrices.dense(1, 1, Array(0.86644)), 
Matrices.dense(1, 1, Array(1.1098)))
+
+val gmm = new GaussianMixture().setK(2).fit(denseDataset)
+
+assert(gmm.weights(0) ~== weights(0) absTol 1E-3)
+assert(gmm.weights(1) ~== weights(1) absTol 1E-3)
+assert(gmm.gaussians(0).mean ~== mean(0) absTol 1E-3)
+assert(gmm.gaussians(1).mean ~== mean(1) absTol 1E-3)
+assert(gmm.gaussians(0).cov ~== cov(0) absTol 1E-3)
+assert(gmm.gaussians(1).cov ~== cov(1) absTol 1E-3)
+  }
+
+  test("univariate sparse data with two clusters") {
+val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+val mean = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+val cov = Array(Matrices.dense(1, 1, Array(0.86644)), 
Matrices.dense(1, 1, Array(1.1098)))
+
+val gmm = new GaussianMixture().setK(2).fit(sparseDataset)
--- End diff --

Set the seed in this and other tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-12-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94086048
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -18,22 +18,37 @@
 package org.apache.spark.ml.clustering
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.linalg.{Matrices, Vector, Vectors}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.Dataset
 
 
 class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
   with DefaultReadWriteTest {
 
+  import testImplicits._
+  import GaussianMixtureSuite._
+
   final val k = 5
   @transient var dataset: Dataset[_] = _
+  @transient var denseDataset: Dataset[_] = _
+  @transient var sparseDataset: Dataset[_] = _
+  @transient var decompositionDataset: Dataset[_] = _
+  @transient var rDataset: Dataset[_] = _
 
   override def beforeAll(): Unit = {
 super.beforeAll()
 
 dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k)
+denseDataset = denseData.map(FeatureData).toDF()
+sparseDataset = denseData.map { point =>
+  FeatureData(Vectors.sparse(1, Array(0), point.toArray))
--- End diff --

Simplify: ```point.toSparse```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-12-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94084918
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
   override def transformSchema(schema: StructType): StructType = {
 validateAndTransformSchema(schema)
   }
+
+  /**
+   * Initialize weights and corresponding gaussian distributions at random.
+   *
+   * We start with uniform weights, a random mean from the data, and 
diagonal covariance matrices
+   * using component variances derived from the samples.
+   *
+   * @param instances The training instances.
+   * @param numClusters The number of clusters.
+   * @param numFeatures The number of features of training instance.
+   * @return The initialized weights and corresponding gaussian 
distributions. Note the
+   * covariance matrix of multivariate gaussian distribution is 
symmetric and
+   * we only save the upper triangular part as a dense vector.
+   */
+  private def initRandom(
+  instances: RDD[Vector],
+  numClusters: Int,
+  numFeatures: Int): (Array[Double], Array[(DenseVector, 
DenseVector)]) = {
+val samples = instances.takeSample(withReplacement = true, numClusters 
* numSamples, $(seed))
+val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters)
+val gaussians: Array[(DenseVector, DenseVector)] = 
Array.tabulate(numClusters) { i =>
+  val slice = samples.view(i * numSamples, (i + 1) * numSamples)
+  val mean = {
+val v = new DenseVector(new Array[Double](numFeatures))
+var i = 0
+while (i < numSamples) {
+  BLAS.axpy(1.0, slice(i), v)
+  i += 1
+}
+BLAS.scal(1.0 / numSamples, v)
+v
+  }
+  /*
+ Construct matrix where diagonal entries are element-wise
+ variance of input vectors (computes biased variance).
+ Since the covariance matrix of multivariate gaussian distribution 
is symmetric,
+ only the upper triangular part of the matrix will be saved as a 
dense vector
+ in order to reduce the shuffled data size.
+   */
+  val cov = {
+val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
+slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
+val diagVec = Vectors.fromBreeze(ss)
+BLAS.scal(1.0 / numSamples, diagVec)
+val covVec = new DenseVector(Array.fill[Double](
+  numFeatures * (numFeatures + 1) / 2)(0.0))
+diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
+  covVec.values(i + i * (i + 1) / 2) = v
+}
+covVec
+  }
+  (mean, cov)
+}
+(weights, gaussians)
+  }
 }
 
 @Since("2.0.0")
 object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * numFeatures > 25 except for when numClusters is very small.
+   *
+   * @param numClusters  Number of clusters
+   * @param numFeatures  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(
+  numClusters: Int,
+  numFeatures: Int): Boolean = {
+((numClusters - 1.0) / numClusters) * numFeatures > 25.0
+  }
+
+  /**
+   * Convert an n * (n + 1) / 2 dimension array representing the upper 
triangular part of a matrix
+   * into an n * n array representing the full symmetric matrix.
+   *
+   * @param n The order of the n by n matrix.
+   * @param triangularValues The upper triangular part of the matrix 
packed in an array
+   * (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangularValues: Array[Double]): Array[Double] = {
+val symmetricValues = new Array[Double](n * n)
+var r = 0
+var i = 0
+while(i < n) {
+  var j = 0
+  while (j <= i) {
+symmetricValues(i * n + j) = triangularValues(r)
+symmetricValues(j * n + i) = triangularValues(r)
+r += 1
+j += 1
+  }
+  i += 1
+}
+symmetricValues
+  }
+
+  /**
+   * Update the weight, mean and covariance of gaussian distribution.
+   *
+   * @param mean The mean of the gaussian distribution.

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-12-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94085211
  
--- Diff: python/pyspark/ml/clustering.py ---
@@ -95,15 +95,10 @@ class GaussianMixture(JavaEstimator, HasFeaturesCol, 
HasPredictionCol, HasMaxIte
 >>> weights = model.weights
 >>> len(weights)
 3
->>> model.gaussiansDF.show()
-+++
-|mean| cov|
-+++
-|[0.8250140229...|0.0056256...|
-|[-0.4777098016092...|0.167969502720916...|
-|[-0.4472625243352...|0.167304119758233...|
-+++
-...
+>>> model.gaussiansDF.select("mean").head()
--- End diff --

I like the table for documentation though.  Does using fewer digits 
stabilize it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-12-28 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r94078123
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
   override def transformSchema(schema: StructType): StructType = {
 validateAndTransformSchema(schema)
   }
+
+  /**
+   * Initialize weights and corresponding gaussian distributions at random.
+   *
+   * We start with uniform weights, a random mean from the data, and 
diagonal covariance matrices
+   * using component variances derived from the samples.
+   *
+   * @param instances The training instances.
+   * @param numClusters The number of clusters.
+   * @param numFeatures The number of features of training instance.
+   * @return The initialized weights and corresponding gaussian 
distributions. Note the
+   * covariance matrix of multivariate gaussian distribution is 
symmetric and
+   * we only save the upper triangular part as a dense vector.
--- End diff --

Document that the matrix is in column-major order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-11-08 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r86948993
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -130,4 +230,29 @@ object GaussianMixtureSuite {
 "maxIter" -> 2,
 "tol" -> 0.01
   )
+
+  val denseData = Seq(
--- End diff --

We should copy test cases and data from mllib to ml when we copying 
algorithm implementation. Since the spark.mllib package would be deleted 
eventually(may be in 3.0?), we should keep spark.ml includes all necessary data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-11-08 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r86948383
  
--- Diff: python/pyspark/ml/clustering.py ---
@@ -95,15 +95,10 @@ class GaussianMixture(JavaEstimator, HasFeaturesCol, 
HasPredictionCol, HasMaxIte
 >>> weights = model.weights
 >>> len(weights)
 3
->>> model.gaussiansDF.show()
-+++
-|mean| cov|
-+++
-|[0.8250140229...|0.0056256...|
-|[-0.4777098016092...|0.167969502720916...|
-|[-0.4472625243352...|0.167304119758233...|
-+++
-...
+>>> model.gaussiansDF.select("mean").head()
--- End diff --

The test is unstable, and it's not necessary to check so many digits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-11-08 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r86947971
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
+  for (j <- 0 to i) {
+symmetric(i * n + j) = triangular(r)
+symmetric(j * n + i) = triangular(r)
+r += 1
+  }
+}
+symmetric
+  }
+
+  private[clustering] def updateWeightsAndGaussians(
+  mean: DenseVector,
+  cov: DenseVector,
+  weight: Double,
+  sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+BLAS.scal(1.0 / weight, mean)
+BLAS.spr(-weight, mean, cov)
+BLAS.scal(1.0 / weight, cov)
+val newWeight = weight / sumWeights
+val newGaussian = (mean, cov)
+(newWeight, newGaussian)
+  }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution 
in the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian 
(Normal) Distribution
+ *in the mixture. Note only upper triangular part of 
the covariance
+ *matrix of each distribution is stored as dense 
vector in order to
+ *reduce shuffled data size.
+ */
+private class ExpectationAggregator(
+numFeatures: Int,
+bcWeights: Broadcast[Array[Double]],
+bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
+
+  private val k: Int = bcWeights.value.length
+  private var totalCnt: Long = 0L
+  private var newLogLikelihood: Double = 0.0
+  private val newWeights: Array[Double] = Array.fill(k)(0.0)
+  private val newMeans: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures)(0.0)))
+  private val newCovs: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 
2)(0.0)))
+
+  @transient private lazy val oldGaussians = {
+bcGaussians.value.map { case (mean, covVec) =>
+  val cov = new DenseMatrix(numFeatures, numFeatures,
+GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values))
+  new MultivariateGaussian(mean, cov)
+}
+  }
+
+  def count: Long = totalCnt
+
+  def logLikelihood: Double = newLogLikelihood
+
+  def weights: Array[Double] = newWeights
+
+  def means: Array[DenseVector] = newMeans
+
+  def covs: Array[DenseVector] = newCovs
+
+  /**
+   * Add a new training data to this ExpectationAggregator, and update the 
log likelihood,
+   * weights for each distribution, means and covariances for all 
distributions.
+   *
+   * @param data The instance of data point to be added.
+   * @return This ExpectationAggregator object.
+   */
+  def add(data: Vector): this.type = {
+val localWeights = bcWeights.value
+
+val p = localWeights.zip(oldGaussians).map { case (weight, gaussian) =>
+  EPSILON + weight * gaussian.pdf(data)
+}
+val pSum = p.sum
+newLogLikelihood += math.log(pSum)
+var i = 0
+while(i < k) {
+  p(i) /= pSum
+  newWeights(i) += p(i)
--- End diff --

I thought the number of iterations(equals to the number of cluster) is 
small enough to ignore the impact, compared with the dimension of gradient in 
LiR/LoR which usually as large as millions or billions, but it's better we can 
avoid the extra cost. 


---
If your project is set 

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85185547
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -316,24 +319,129 @@ class GaussianMixture @Since("2.0.0") (
   @Since("2.0.0")
   def setSeed(value: Long): this.type = set(seed, value)
 
+  // number of samples per cluster to use when initializing Gaussians
+  private val nSamples = 5
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
 transformSchema(dataset.schema, logging = true)
-val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
-  case Row(point: Vector) => OldVectors.fromML(point)
+
+val sc = dataset.sparkSession.sparkContext
+val _k = $(k)
--- End diff --

maybe `nClusters`? I don't have a strong preference but I think it's 
clearer when reading the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85186676
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -316,24 +319,129 @@ class GaussianMixture @Since("2.0.0") (
   @Since("2.0.0")
   def setSeed(value: Long): this.type = set(seed, value)
 
+  // number of samples per cluster to use when initializing Gaussians
+  private val nSamples = 5
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
 transformSchema(dataset.schema, logging = true)
-val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
-  case Row(point: Vector) => OldVectors.fromML(point)
+
+val sc = dataset.sparkSession.sparkContext
+val _k = $(k)
+
+val instances: RDD[Vector] = 
dataset.select(col($(featuresCol))).rdd.map {
+  case Row(features: Vector) => features
+}.cache()
+
+// Extract the number of features.
--- End diff --

this comment adds no value IMO


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85160212
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
+  for (j <- 0 to i) {
+symmetric(i * n + j) = triangular(r)
+symmetric(j * n + i) = triangular(r)
+r += 1
+  }
+}
+symmetric
+  }
+
+  private[clustering] def updateWeightsAndGaussians(
+  mean: DenseVector,
+  cov: DenseVector,
+  weight: Double,
+  sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+BLAS.scal(1.0 / weight, mean)
+BLAS.spr(-weight, mean, cov)
+BLAS.scal(1.0 / weight, cov)
+val newWeight = weight / sumWeights
+val newGaussian = (mean, cov)
+(newWeight, newGaussian)
+  }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution 
in the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian 
(Normal) Distribution
+ *in the mixture. Note only upper triangular part of 
the covariance
+ *matrix of each distribution is stored as dense 
vector in order to
+ *reduce shuffled data size.
+ */
+private class ExpectationAggregator(
+numFeatures: Int,
+bcWeights: Broadcast[Array[Double]],
+bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
+
+  private val k: Int = bcWeights.value.length
+  private var totalCnt: Long = 0L
+  private var newLogLikelihood: Double = 0.0
+  private val newWeights: Array[Double] = Array.fill(k)(0.0)
+  private val newMeans: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures)(0.0)))
+  private val newCovs: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 
2)(0.0)))
+
+  @transient private lazy val oldGaussians = {
+bcGaussians.value.map { case (mean, covVec) =>
+  val cov = new DenseMatrix(numFeatures, numFeatures,
+GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values))
+  new MultivariateGaussian(mean, cov)
+}
+  }
+
+  def count: Long = totalCnt
+
+  def logLikelihood: Double = newLogLikelihood
+
+  def weights: Array[Double] = newWeights
+
+  def means: Array[DenseVector] = newMeans
+
+  def covs: Array[DenseVector] = newCovs
+
+  /**
+   * Add a new training data to this ExpectationAggregator, and update the 
log likelihood,
+   * weights for each distribution, means and covariances for all 
distributions.
+   *
+   * @param data The instance of data point to be added.
+   * @return This ExpectationAggregator object.
+   */
+  def add(data: Vector): this.type = {
+val localWeights = bcWeights.value
+
+val p = localWeights.zip(oldGaussians).map { case (weight, gaussian) =>
--- End diff --

This will allocate an intermediate zipped array. Maybe we can use a while 
loop and also collect `pSum` inside it. We should use a `localGaussians` 
reference as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85188373
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
--- End diff --

nit: 25.0. Also, let's call it `numFeatures` instead of `d`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85025434
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -316,24 +319,129 @@ class GaussianMixture @Since("2.0.0") (
   @Since("2.0.0")
   def setSeed(value: Long): this.type = set(seed, value)
 
+  // number of samples per cluster to use when initializing Gaussians
+  private val nSamples = 5
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
 transformSchema(dataset.schema, logging = true)
-val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
-  case Row(point: Vector) => OldVectors.fromML(point)
+
+val sc = dataset.sparkSession.sparkContext
+val _k = $(k)
+
+val instances: RDD[Vector] = 
dataset.select(col($(featuresCol))).rdd.map {
+  case Row(features: Vector) => features
+}.cache()
+
+// Extract the number of features.
+val numFeatures = instances.first().size
+
+val shouldDistributeGaussians = 
GaussianMixture.shouldDistributeGaussians(_k, numFeatures)
+
+// Determine initial weights and corresponding Gaussians.
+// We start with uniform weights, a random mean from the data, and
+// diagonal covariance matrices using component variances
+// derived from the samples.
+// TODO: Support users supplied initial GMM.
+val samples = instances.takeSample(withReplacement = true, _k * 
nSamples, $(seed))
+val weights: Array[Double] = Array.fill(_k)(1.0 / _k)
+/**
+ * Since the covariance matrix of multivariate gaussian distribution 
is symmetric,
+ * only the upper triangular part of the matrix will be stored as a 
dense vector
+ * in order to reduce the shuffled data size.
+ */
+val gaussians: Array[(DenseVector, DenseVector)] = Array.tabulate(_k) 
{ i =>
--- End diff --

I think it would be nice to factor this out into an initialization method 
so we can just call `val gaussians = initRandom(...)` or similar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85175966
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
+  for (j <- 0 to i) {
+symmetric(i * n + j) = triangular(r)
+symmetric(j * n + i) = triangular(r)
+r += 1
+  }
+}
+symmetric
+  }
+
+  private[clustering] def updateWeightsAndGaussians(
+  mean: DenseVector,
+  cov: DenseVector,
+  weight: Double,
+  sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+BLAS.scal(1.0 / weight, mean)
+BLAS.spr(-weight, mean, cov)
+BLAS.scal(1.0 / weight, cov)
+val newWeight = weight / sumWeights
+val newGaussian = (mean, cov)
+(newWeight, newGaussian)
+  }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution 
in the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian 
(Normal) Distribution
+ *in the mixture. Note only upper triangular part of 
the covariance
+ *matrix of each distribution is stored as dense 
vector in order to
+ *reduce shuffled data size.
+ */
+private class ExpectationAggregator(
+numFeatures: Int,
+bcWeights: Broadcast[Array[Double]],
+bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
+
+  private val k: Int = bcWeights.value.length
+  private var totalCnt: Long = 0L
+  private var newLogLikelihood: Double = 0.0
+  private val newWeights: Array[Double] = Array.fill(k)(0.0)
+  private val newMeans: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures)(0.0)))
+  private val newCovs: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 
2)(0.0)))
+
+  @transient private lazy val oldGaussians = {
+bcGaussians.value.map { case (mean, covVec) =>
+  val cov = new DenseMatrix(numFeatures, numFeatures,
+GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values))
+  new MultivariateGaussian(mean, cov)
+}
+  }
+
+  def count: Long = totalCnt
+
+  def logLikelihood: Double = newLogLikelihood
+
+  def weights: Array[Double] = newWeights
+
+  def means: Array[DenseVector] = newMeans
+
+  def covs: Array[DenseVector] = newCovs
+
+  /**
+   * Add a new training data to this ExpectationAggregator, and update the 
log likelihood,
+   * weights for each distribution, means and covariances for all 
distributions.
+   *
+   * @param data The instance of data point to be added.
+   * @return This ExpectationAggregator object.
+   */
+  def add(data: Vector): this.type = {
--- End diff --

call it `instance` instead  - "data" is plural


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85151727
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
+  for (j <- 0 to i) {
+symmetric(i * n + j) = triangular(r)
+symmetric(j * n + i) = triangular(r)
+r += 1
+  }
+}
+symmetric
+  }
+
+  private[clustering] def updateWeightsAndGaussians(
+  mean: DenseVector,
+  cov: DenseVector,
+  weight: Double,
+  sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+BLAS.scal(1.0 / weight, mean)
+BLAS.spr(-weight, mean, cov)
+BLAS.scal(1.0 / weight, cov)
+val newWeight = weight / sumWeights
+val newGaussian = (mean, cov)
+(newWeight, newGaussian)
+  }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution 
in the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian 
(Normal) Distribution
+ *in the mixture. Note only upper triangular part of 
the covariance
+ *matrix of each distribution is stored as dense 
vector in order to
+ *reduce shuffled data size.
+ */
+private class ExpectationAggregator(
+numFeatures: Int,
+bcWeights: Broadcast[Array[Double]],
+bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
+
+  private val k: Int = bcWeights.value.length
+  private var totalCnt: Long = 0L
+  private var newLogLikelihood: Double = 0.0
+  private val newWeights: Array[Double] = Array.fill(k)(0.0)
+  private val newMeans: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures)(0.0)))
+  private val newCovs: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 
2)(0.0)))
+
+  @transient private lazy val oldGaussians = {
+bcGaussians.value.map { case (mean, covVec) =>
+  val cov = new DenseMatrix(numFeatures, numFeatures,
+GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values))
+  new MultivariateGaussian(mean, cov)
+}
+  }
+
+  def count: Long = totalCnt
+
+  def logLikelihood: Double = newLogLikelihood
+
+  def weights: Array[Double] = newWeights
+
+  def means: Array[DenseVector] = newMeans
+
+  def covs: Array[DenseVector] = newCovs
+
+  /**
+   * Add a new training data to this ExpectationAggregator, and update the 
log likelihood,
+   * weights for each distribution, means and covariances for all 
distributions.
+   *
+   * @param data The instance of data point to be added.
+   * @return This ExpectationAggregator object.
+   */
+  def add(data: Vector): this.type = {
+val localWeights = bcWeights.value
+
+val p = localWeights.zip(oldGaussians).map { case (weight, gaussian) =>
+  EPSILON + weight * gaussian.pdf(data)
+}
+val pSum = p.sum
+newLogLikelihood += math.log(pSum)
+var i = 0
+while(i < k) {
+  p(i) /= pSum
+  newWeights(i) += p(i)
+  BLAS.axpy(p(i), data, newMeans(i))
+  BLAS.spr(p(i), data, newCovs(i))
+  i += 1
+}
+
+totalCnt += 1
+this
+  }
+
+  /**
+   * Merge another ExpectationAggregator, and update the log likelihood,
+   * weights for each distribution, means and 

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85176823
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
+  for (j <- 0 to i) {
+symmetric(i * n + j) = triangular(r)
+symmetric(j * n + i) = triangular(r)
+r += 1
+  }
+}
+symmetric
+  }
+
+  private[clustering] def updateWeightsAndGaussians(
+  mean: DenseVector,
+  cov: DenseVector,
+  weight: Double,
+  sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+BLAS.scal(1.0 / weight, mean)
+BLAS.spr(-weight, mean, cov)
+BLAS.scal(1.0 / weight, cov)
+val newWeight = weight / sumWeights
+val newGaussian = (mean, cov)
+(newWeight, newGaussian)
+  }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution 
in the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian 
(Normal) Distribution
+ *in the mixture. Note only upper triangular part of 
the covariance
+ *matrix of each distribution is stored as dense 
vector in order to
+ *reduce shuffled data size.
+ */
+private class ExpectationAggregator(
+numFeatures: Int,
+bcWeights: Broadcast[Array[Double]],
+bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
+
+  private val k: Int = bcWeights.value.length
+  private var totalCnt: Long = 0L
+  private var newLogLikelihood: Double = 0.0
+  private val newWeights: Array[Double] = Array.fill(k)(0.0)
+  private val newMeans: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures)(0.0)))
+  private val newCovs: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 
2)(0.0)))
+
+  @transient private lazy val oldGaussians = {
+bcGaussians.value.map { case (mean, covVec) =>
+  val cov = new DenseMatrix(numFeatures, numFeatures,
+GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values))
+  new MultivariateGaussian(mean, cov)
+}
+  }
+
+  def count: Long = totalCnt
+
+  def logLikelihood: Double = newLogLikelihood
+
+  def weights: Array[Double] = newWeights
+
+  def means: Array[DenseVector] = newMeans
+
+  def covs: Array[DenseVector] = newCovs
+
+  /**
+   * Add a new training data to this ExpectationAggregator, and update the 
log likelihood,
+   * weights for each distribution, means and covariances for all 
distributions.
+   *
+   * @param data The instance of data point to be added.
+   * @return This ExpectationAggregator object.
+   */
+  def add(data: Vector): this.type = {
+val localWeights = bcWeights.value
+
+val p = localWeights.zip(oldGaussians).map { case (weight, gaussian) =>
+  EPSILON + weight * gaussian.pdf(data)
+}
+val pSum = p.sum
+newLogLikelihood += math.log(pSum)
+var i = 0
+while(i < k) {
+  p(i) /= pSum
+  newWeights(i) += p(i)
+  BLAS.axpy(p(i), data, newMeans(i))
+  BLAS.spr(p(i), data, newCovs(i))
+  i += 1
+}
+
+totalCnt += 1
+this
+  }
+
+  /**
+   * Merge another ExpectationAggregator, and update the log likelihood,
+   * weights for each distribution, means and 

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85157509
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
+  for (j <- 0 to i) {
+symmetric(i * n + j) = triangular(r)
+symmetric(j * n + i) = triangular(r)
+r += 1
+  }
+}
+symmetric
+  }
+
+  private[clustering] def updateWeightsAndGaussians(
+  mean: DenseVector,
+  cov: DenseVector,
+  weight: Double,
+  sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+BLAS.scal(1.0 / weight, mean)
+BLAS.spr(-weight, mean, cov)
+BLAS.scal(1.0 / weight, cov)
+val newWeight = weight / sumWeights
+val newGaussian = (mean, cov)
+(newWeight, newGaussian)
+  }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution 
in the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian 
(Normal) Distribution
+ *in the mixture. Note only upper triangular part of 
the covariance
+ *matrix of each distribution is stored as dense 
vector in order to
+ *reduce shuffled data size.
+ */
+private class ExpectationAggregator(
+numFeatures: Int,
+bcWeights: Broadcast[Array[Double]],
+bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
+
+  private val k: Int = bcWeights.value.length
+  private var totalCnt: Long = 0L
+  private var newLogLikelihood: Double = 0.0
+  private val newWeights: Array[Double] = Array.fill(k)(0.0)
+  private val newMeans: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures)(0.0)))
+  private val newCovs: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 
2)(0.0)))
+
+  @transient private lazy val oldGaussians = {
+bcGaussians.value.map { case (mean, covVec) =>
+  val cov = new DenseMatrix(numFeatures, numFeatures,
+GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values))
+  new MultivariateGaussian(mean, cov)
+}
+  }
+
+  def count: Long = totalCnt
+
+  def logLikelihood: Double = newLogLikelihood
+
+  def weights: Array[Double] = newWeights
+
+  def means: Array[DenseVector] = newMeans
+
+  def covs: Array[DenseVector] = newCovs
+
+  /**
+   * Add a new training data to this ExpectationAggregator, and update the 
log likelihood,
+   * weights for each distribution, means and covariances for all 
distributions.
+   *
+   * @param data The instance of data point to be added.
+   * @return This ExpectationAggregator object.
+   */
+  def add(data: Vector): this.type = {
+val localWeights = bcWeights.value
+
+val p = localWeights.zip(oldGaussians).map { case (weight, gaussian) =>
+  EPSILON + weight * gaussian.pdf(data)
+}
+val pSum = p.sum
+newLogLikelihood += math.log(pSum)
+var i = 0
+while(i < k) {
+  p(i) /= pSum
+  newWeights(i) += p(i)
--- End diff --

use local pointers `localNewWeights`, `localNewMeans`, `localNewCovs`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not 

[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85186038
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -316,24 +319,129 @@ class GaussianMixture @Since("2.0.0") (
   @Since("2.0.0")
   def setSeed(value: Long): this.type = set(seed, value)
 
+  // number of samples per cluster to use when initializing Gaussians
+  private val nSamples = 5
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
 transformSchema(dataset.schema, logging = true)
-val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
-  case Row(point: Vector) => OldVectors.fromML(point)
+
+val sc = dataset.sparkSession.sparkContext
+val _k = $(k)
+
+val instances: RDD[Vector] = 
dataset.select(col($(featuresCol))).rdd.map {
+  case Row(features: Vector) => features
+}.cache()
+
+// Extract the number of features.
+val numFeatures = instances.first().size
+
+val shouldDistributeGaussians = 
GaussianMixture.shouldDistributeGaussians(_k, numFeatures)
+
+// Determine initial weights and corresponding Gaussians.
+// We start with uniform weights, a random mean from the data, and
+// diagonal covariance matrices using component variances
+// derived from the samples.
+// TODO: Support users supplied initial GMM.
--- End diff --

Let's make a JIRA for it and reference that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85179044
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -316,24 +319,129 @@ class GaussianMixture @Since("2.0.0") (
   @Since("2.0.0")
   def setSeed(value: Long): this.type = set(seed, value)
 
+  // number of samples per cluster to use when initializing Gaussians
+  private val nSamples = 5
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
 transformSchema(dataset.schema, logging = true)
-val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
-  case Row(point: Vector) => OldVectors.fromML(point)
+
+val sc = dataset.sparkSession.sparkContext
+val _k = $(k)
+
+val instances: RDD[Vector] = 
dataset.select(col($(featuresCol))).rdd.map {
+  case Row(features: Vector) => features
+}.cache()
+
+// Extract the number of features.
+val numFeatures = instances.first().size
+
+val shouldDistributeGaussians = 
GaussianMixture.shouldDistributeGaussians(_k, numFeatures)
+
+// Determine initial weights and corresponding Gaussians.
+// We start with uniform weights, a random mean from the data, and
+// diagonal covariance matrices using component variances
+// derived from the samples.
+// TODO: Support users supplied initial GMM.
+val samples = instances.takeSample(withReplacement = true, _k * 
nSamples, $(seed))
+val weights: Array[Double] = Array.fill(_k)(1.0 / _k)
+/**
--- End diff --

AFAIK we typically use `/* ... */` comments for longer comments inside 
method bodies like this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85175762
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
+  for (j <- 0 to i) {
+symmetric(i * n + j) = triangular(r)
+symmetric(j * n + i) = triangular(r)
+r += 1
+  }
+}
+symmetric
+  }
+
+  private[clustering] def updateWeightsAndGaussians(
+  mean: DenseVector,
+  cov: DenseVector,
+  weight: Double,
+  sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+BLAS.scal(1.0 / weight, mean)
+BLAS.spr(-weight, mean, cov)
+BLAS.scal(1.0 / weight, cov)
+val newWeight = weight / sumWeights
+val newGaussian = (mean, cov)
+(newWeight, newGaussian)
+  }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution 
in the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian 
(Normal) Distribution
+ *in the mixture. Note only upper triangular part of 
the covariance
+ *matrix of each distribution is stored as dense 
vector in order to
+ *reduce shuffled data size.
+ */
+private class ExpectationAggregator(
+numFeatures: Int,
+bcWeights: Broadcast[Array[Double]],
+bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
+
+  private val k: Int = bcWeights.value.length
+  private var totalCnt: Long = 0L
+  private var newLogLikelihood: Double = 0.0
--- End diff --

why are we using "new" as a prefix here and elsewhere? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85187274
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
--- End diff --

let's use while since this is called in several places


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85152196
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
+  for (j <- 0 to i) {
+symmetric(i * n + j) = triangular(r)
+symmetric(j * n + i) = triangular(r)
+r += 1
+  }
+}
+symmetric
+  }
+
+  private[clustering] def updateWeightsAndGaussians(
+  mean: DenseVector,
+  cov: DenseVector,
+  weight: Double,
+  sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+BLAS.scal(1.0 / weight, mean)
+BLAS.spr(-weight, mean, cov)
+BLAS.scal(1.0 / weight, cov)
+val newWeight = weight / sumWeights
+val newGaussian = (mean, cov)
+(newWeight, newGaussian)
+  }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution 
in the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian 
(Normal) Distribution
+ *in the mixture. Note only upper triangular part of 
the covariance
+ *matrix of each distribution is stored as dense 
vector in order to
+ *reduce shuffled data size.
+ */
+private class ExpectationAggregator(
+numFeatures: Int,
+bcWeights: Broadcast[Array[Double]],
+bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
+
+  private val k: Int = bcWeights.value.length
+  private var totalCnt: Long = 0L
+  private var newLogLikelihood: Double = 0.0
+  private val newWeights: Array[Double] = Array.fill(k)(0.0)
--- End diff --

use `new Array[Double](size)` instead of `Array.fill(size)(0.0)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85175523
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
+  for (j <- 0 to i) {
+symmetric(i * n + j) = triangular(r)
+symmetric(j * n + i) = triangular(r)
+r += 1
+  }
+}
+symmetric
+  }
+
+  private[clustering] def updateWeightsAndGaussians(
--- End diff --

Add doc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85179222
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -316,24 +319,129 @@ class GaussianMixture @Since("2.0.0") (
   @Since("2.0.0")
   def setSeed(value: Long): this.type = set(seed, value)
 
+  // number of samples per cluster to use when initializing Gaussians
--- End diff --

use `/** ... */` instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85177022
  
--- Diff: python/pyspark/ml/clustering.py ---
@@ -95,15 +95,10 @@ class GaussianMixture(JavaEstimator, HasFeaturesCol, 
HasPredictionCol, HasMaxIte
 >>> weights = model.weights
 >>> len(weights)
 3
->>> model.gaussiansDF.show()
-+++
-|mean| cov|
-+++
-|[0.8250140229...|0.0056256...|
-|[-0.4777098016092...|0.167969502720916...|
-|[-0.4472625243352...|0.167304119758233...|
-+++
-...
+>>> model.gaussiansDF.select("mean").head()
--- End diff --

Why are we changing this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85178523
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala 
---
@@ -130,4 +230,29 @@ object GaussianMixtureSuite {
 "maxIter" -> 2,
 "tol" -> 0.01
   )
+
+  val denseData = Seq(
--- End diff --

Can we just import it from mllib instead? Same for `decompositionData`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85189028
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -316,24 +319,129 @@ class GaussianMixture @Since("2.0.0") (
   @Since("2.0.0")
   def setSeed(value: Long): this.type = set(seed, value)
 
+  // number of samples per cluster to use when initializing Gaussians
+  private val nSamples = 5
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
 transformSchema(dataset.schema, logging = true)
-val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
-  case Row(point: Vector) => OldVectors.fromML(point)
+
+val sc = dataset.sparkSession.sparkContext
+val _k = $(k)
+
+val instances: RDD[Vector] = 
dataset.select(col($(featuresCol))).rdd.map {
+  case Row(features: Vector) => features
+}.cache()
+
+// Extract the number of features.
+val numFeatures = instances.first().size
+
+val shouldDistributeGaussians = 
GaussianMixture.shouldDistributeGaussians(_k, numFeatures)
+
+// Determine initial weights and corresponding Gaussians.
+// We start with uniform weights, a random mean from the data, and
+// diagonal covariance matrices using component variances
+// derived from the samples.
+// TODO: Support users supplied initial GMM.
+val samples = instances.takeSample(withReplacement = true, _k * 
nSamples, $(seed))
+val weights: Array[Double] = Array.fill(_k)(1.0 / _k)
+/**
+ * Since the covariance matrix of multivariate gaussian distribution 
is symmetric,
+ * only the upper triangular part of the matrix will be stored as a 
dense vector
+ * in order to reduce the shuffled data size.
+ */
+val gaussians: Array[(DenseVector, DenseVector)] = Array.tabulate(_k) 
{ i =>
+  val slice = samples.view(i * nSamples, (i + 1) * nSamples)
+  val mean = {
+val v = new DenseVector(Array.fill[Double](numFeatures)(0.0))
+var i = 0
+while (i < nSamples) {
+  BLAS.axpy(1.0, slice(i), v)
+  i += 1
+}
+BLAS.scal(1.0 / nSamples, v)
+v
+  }
+  /**
+   * Construct matrix where diagonal entries are element-wise
+   * variance of input vectors (computes biased variance).
+   */
+  val cov = {
+val ss = new 
DenseVector(Array.fill[Double](numFeatures)(0)).asBreeze
+slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
+val diagVec = Vectors.fromBreeze(ss)
+BLAS.scal(1.0 / nSamples, diagVec)
+val covVec = new DenseVector(Array.fill[Double](
+  numFeatures * (numFeatures + 1) / 2)(0.0))
+diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
+  covVec.values(i + i * (i + 1) / 2) = v
+}
+covVec
+  }
+  (mean, cov)
 }
 
-val algo = new MLlibGM()
-  .setK($(k))
-  .setMaxIterations($(maxIter))
-  .setSeed($(seed))
-  .setConvergenceTol($(tol))
-val parentModel = algo.run(rdd)
-val gaussians = parentModel.gaussians.map { case g =>
-  new MultivariateGaussian(g.mu.asML, g.sigma.asML)
+var llh = Double.MinValue // current log-likelihood
--- End diff --

minor: why not `logLikelihood` and `logLikelihoodPrev` ? It's nice to have 
descriptive variable names, then we can remove the comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85176006
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
+  for (j <- 0 to i) {
+symmetric(i * n + j) = triangular(r)
+symmetric(j * n + i) = triangular(r)
+r += 1
+  }
+}
+symmetric
+  }
+
+  private[clustering] def updateWeightsAndGaussians(
+  mean: DenseVector,
+  cov: DenseVector,
+  weight: Double,
+  sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+BLAS.scal(1.0 / weight, mean)
+BLAS.spr(-weight, mean, cov)
+BLAS.scal(1.0 / weight, cov)
+val newWeight = weight / sumWeights
+val newGaussian = (mean, cov)
+(newWeight, newGaussian)
+  }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution 
in the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian 
(Normal) Distribution
+ *in the mixture. Note only upper triangular part of 
the covariance
+ *matrix of each distribution is stored as dense 
vector in order to
+ *reduce shuffled data size.
+ */
+private class ExpectationAggregator(
+numFeatures: Int,
+bcWeights: Broadcast[Array[Double]],
+bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
+
+  private val k: Int = bcWeights.value.length
+  private var totalCnt: Long = 0L
+  private var newLogLikelihood: Double = 0.0
+  private val newWeights: Array[Double] = Array.fill(k)(0.0)
+  private val newMeans: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures)(0.0)))
+  private val newCovs: Array[DenseVector] = Array.fill(k)(
+new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 
2)(0.0)))
+
+  @transient private lazy val oldGaussians = {
+bcGaussians.value.map { case (mean, covVec) =>
+  val cov = new DenseMatrix(numFeatures, numFeatures,
+GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values))
+  new MultivariateGaussian(mean, cov)
+}
+  }
+
+  def count: Long = totalCnt
+
+  def logLikelihood: Double = newLogLikelihood
+
+  def weights: Array[Double] = newWeights
+
+  def means: Array[DenseVector] = newMeans
+
+  def covs: Array[DenseVector] = newCovs
+
+  /**
+   * Add a new training data to this ExpectationAggregator, and update the 
log likelihood,
--- End diff --

"Add a new training instance"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85189652
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
+   * @param n The order of the n by n matrix.
+   * @param triangular The upper triangular part of the matrix packed in 
an array (column major).
+   * @return An array which represents the symmetric matrix in column 
major.
+   */
+  private[clustering] def unpackUpperTriangularMatrix(
+  n: Int,
+  triangular: Array[Double]): Array[Double] = {
+val symmetric = Array.fill(n * n)(0.0)
+var r = 0
+for (i <- 0 until n) {
+  for (j <- 0 to i) {
+symmetric(i * n + j) = triangular(r)
+symmetric(j * n + i) = triangular(r)
+r += 1
+  }
+}
+symmetric
+  }
+
+  private[clustering] def updateWeightsAndGaussians(
+  mean: DenseVector,
+  cov: DenseVector,
+  weight: Double,
+  sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+BLAS.scal(1.0 / weight, mean)
+BLAS.spr(-weight, mean, cov)
+BLAS.scal(1.0 / weight, cov)
+val newWeight = weight / sumWeights
+val newGaussian = (mean, cov)
+(newWeight, newGaussian)
+  }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
--- End diff --

We have typically used this documentation as a place to explain the math 
used to compute updates. It would be nice to have that here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...

2016-10-26 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15413#discussion_r85188183
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends 
DefaultParamsReadable[GaussianMixture] {
 
   @Since("2.0.0")
   override def load(path: String): GaussianMixture = super.load(path)
+
+  /**
+   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
+   * d > 25 except for when k is very small.
+   *
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  private[clustering] def shouldDistributeGaussians(k: Int, d: Int): 
Boolean = {
+((k - 1.0) / k) * d > 25
+  }
+
+  /**
+   * Unpack upper triangular part of a symmetric matrix.
--- End diff --

"Convert an n * (n + 1) / 2 dimension array representing the upper 
triangular part of a matrix into an n * n array representing the full symmetric 
matrix". I think that's more explicit about what is happening. Also very minor 
nit, can we call the array `triangularValues` instead? `triangular` sounds like 
it should be a boolean to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org