Github user sethah commented on a diff in the pull request:
https://github.com/apache/spark/pull/15413#discussion_r85151727
--- Diff:
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -350,6 +458,145 @@ object GaussianMixture extends
DefaultParamsReadable[GaussianMixture] {
@Since("2.0.0")
override def load(path: String): GaussianMixture = super.load(path)
+
+ /**
+ * Heuristic to distribute the computation of the
[[MultivariateGaussian]]s, approximately when
+ * d > 25 except for when k is very small.
+ *
+ * @param k Number of topics
+ * @param d Number of features
+ */
+ private[clustering] def shouldDistributeGaussians(k: Int, d: Int):
Boolean = {
+ ((k - 1.0) / k) * d > 25
+ }
+
+ /**
+ * Unpack upper triangular part of a symmetric matrix.
+ * @param n The order of the n by n matrix.
+ * @param triangular The upper triangular part of the matrix packed in
an array (column major).
+ * @return An array which represents the symmetric matrix in column
major.
+ */
+ private[clustering] def unpackUpperTriangularMatrix(
+ n: Int,
+ triangular: Array[Double]): Array[Double] = {
+ val symmetric = Array.fill(n * n)(0.0)
+ var r = 0
+ for (i <- 0 until n) {
+ for (j <- 0 to i) {
+ symmetric(i * n + j) = triangular(r)
+ symmetric(j * n + i) = triangular(r)
+ r += 1
+ }
+ }
+ symmetric
+ }
+
+ private[clustering] def updateWeightsAndGaussians(
+ mean: DenseVector,
+ cov: DenseVector,
+ weight: Double,
+ sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+ BLAS.scal(1.0 / weight, mean)
+ BLAS.spr(-weight, mean, cov)
+ BLAS.scal(1.0 / weight, cov)
+ val newWeight = weight / sumWeights
+ val newGaussian = (mean, cov)
+ (newWeight, newGaussian)
+ }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution
in the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian
(Normal) Distribution
+ * in the mixture. Note only upper triangular part of
the covariance
+ * matrix of each distribution is stored as dense
vector in order to
+ * reduce shuffled data size.
+ */
+private class ExpectationAggregator(
+ numFeatures: Int,
+ bcWeights: Broadcast[Array[Double]],
+ bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends
Serializable {
+
+ private val k: Int = bcWeights.value.length
+ private var totalCnt: Long = 0L
+ private var newLogLikelihood: Double = 0.0
+ private val newWeights: Array[Double] = Array.fill(k)(0.0)
+ private val newMeans: Array[DenseVector] = Array.fill(k)(
+ new DenseVector(Array.fill[Double](numFeatures)(0.0)))
+ private val newCovs: Array[DenseVector] = Array.fill(k)(
+ new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) /
2)(0.0)))
+
+ @transient private lazy val oldGaussians = {
+ bcGaussians.value.map { case (mean, covVec) =>
+ val cov = new DenseMatrix(numFeatures, numFeatures,
+ GaussianMixture.unpackUpperTriangularMatrix(numFeatures,
covVec.values))
+ new MultivariateGaussian(mean, cov)
+ }
+ }
+
+ def count: Long = totalCnt
+
+ def logLikelihood: Double = newLogLikelihood
+
+ def weights: Array[Double] = newWeights
+
+ def means: Array[DenseVector] = newMeans
+
+ def covs: Array[DenseVector] = newCovs
+
+ /**
+ * Add a new training data to this ExpectationAggregator, and update the
log likelihood,
+ * weights for each distribution, means and covariances for all
distributions.
+ *
+ * @param data The instance of data point to be added.
+ * @return This ExpectationAggregator object.
+ */
+ def add(data: Vector): this.type = {
+ val localWeights = bcWeights.value
+
+ val p = localWeights.zip(oldGaussians).map { case (weight, gaussian) =>
+ EPSILON + weight * gaussian.pdf(data)
+ }
+ val pSum = p.sum
+ newLogLikelihood += math.log(pSum)
+ var i = 0
+ while(i < k) {
+ p(i) /= pSum
+ newWeights(i) += p(i)
+ BLAS.axpy(p(i), data, newMeans(i))
+ BLAS.spr(p(i), data, newCovs(i))
+ i += 1
+ }
+
+ totalCnt += 1
+ this
+ }
+
+ /**
+ * Merge another ExpectationAggregator, and update the log likelihood,
+ * weights for each distribution, means and covariances for all
distributions.
+ *
+ * @param other The other ExpectationAggregator to be merged.
+ * @return This ExpectationAggregator object.
+ */
+ def merge(other: ExpectationAggregator): this.type = {
+ if (other.count != 0) {
+ totalCnt += other.totalCnt
+
+ var i = 0
+ while(i < k) {
+ newWeights(i) += other.newWeights(i)
--- End diff --
use local pointers to avoid calling virtual methods each iteration.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]