Github user jkbradley commented on a diff in the pull request:
https://github.com/apache/spark/pull/15413#discussion_r94084918
--- Diff:
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
@@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
override def transformSchema(schema: StructType): StructType = {
validateAndTransformSchema(schema)
}
+
+ /**
+ * Initialize weights and corresponding gaussian distributions at random.
+ *
+ * We start with uniform weights, a random mean from the data, and
diagonal covariance matrices
+ * using component variances derived from the samples.
+ *
+ * @param instances The training instances.
+ * @param numClusters The number of clusters.
+ * @param numFeatures The number of features of training instance.
+ * @return The initialized weights and corresponding gaussian
distributions. Note the
+ * covariance matrix of multivariate gaussian distribution is
symmetric and
+ * we only save the upper triangular part as a dense vector.
+ */
+ private def initRandom(
+ instances: RDD[Vector],
+ numClusters: Int,
+ numFeatures: Int): (Array[Double], Array[(DenseVector,
DenseVector)]) = {
+ val samples = instances.takeSample(withReplacement = true, numClusters
* numSamples, $(seed))
+ val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters)
+ val gaussians: Array[(DenseVector, DenseVector)] =
Array.tabulate(numClusters) { i =>
+ val slice = samples.view(i * numSamples, (i + 1) * numSamples)
+ val mean = {
+ val v = new DenseVector(new Array[Double](numFeatures))
+ var i = 0
+ while (i < numSamples) {
+ BLAS.axpy(1.0, slice(i), v)
+ i += 1
+ }
+ BLAS.scal(1.0 / numSamples, v)
+ v
+ }
+ /*
+ Construct matrix where diagonal entries are element-wise
+ variance of input vectors (computes biased variance).
+ Since the covariance matrix of multivariate gaussian distribution
is symmetric,
+ only the upper triangular part of the matrix will be saved as a
dense vector
+ in order to reduce the shuffled data size.
+ */
+ val cov = {
+ val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
+ slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
+ val diagVec = Vectors.fromBreeze(ss)
+ BLAS.scal(1.0 / numSamples, diagVec)
+ val covVec = new DenseVector(Array.fill[Double](
+ numFeatures * (numFeatures + 1) / 2)(0.0))
+ diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
+ covVec.values(i + i * (i + 1) / 2) = v
+ }
+ covVec
+ }
+ (mean, cov)
+ }
+ (weights, gaussians)
+ }
}
@Since("2.0.0")
object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
@Since("2.0.0")
override def load(path: String): GaussianMixture = super.load(path)
+
+ /**
+ * Heuristic to distribute the computation of the
[[MultivariateGaussian]]s, approximately when
+ * numFeatures > 25 except for when numClusters is very small.
+ *
+ * @param numClusters Number of clusters
+ * @param numFeatures Number of features
+ */
+ private[clustering] def shouldDistributeGaussians(
+ numClusters: Int,
+ numFeatures: Int): Boolean = {
+ ((numClusters - 1.0) / numClusters) * numFeatures > 25.0
+ }
+
+ /**
+ * Convert an n * (n + 1) / 2 dimension array representing the upper
triangular part of a matrix
+ * into an n * n array representing the full symmetric matrix.
+ *
+ * @param n The order of the n by n matrix.
+ * @param triangularValues The upper triangular part of the matrix
packed in an array
+ * (column major).
+ * @return An array which represents the symmetric matrix in column
major.
+ */
+ private[clustering] def unpackUpperTriangularMatrix(
+ n: Int,
+ triangularValues: Array[Double]): Array[Double] = {
+ val symmetricValues = new Array[Double](n * n)
+ var r = 0
+ var i = 0
+ while(i < n) {
+ var j = 0
+ while (j <= i) {
+ symmetricValues(i * n + j) = triangularValues(r)
+ symmetricValues(j * n + i) = triangularValues(r)
+ r += 1
+ j += 1
+ }
+ i += 1
+ }
+ symmetricValues
+ }
+
+ /**
+ * Update the weight, mean and covariance of gaussian distribution.
+ *
+ * @param mean The mean of the gaussian distribution.
+ * @param cov The covariance matrix of the gaussian distribution. Note
we only
+ * save the upper triangular part as a dense vector.
+ * @param weight The weight of the gaussian distribution.
+ * @param sumWeights The sum of weights of all clusters.
+ * @return The updated weight, mean and covariance.
+ */
+ private[clustering] def updateWeightsAndGaussians(
+ mean: DenseVector,
+ cov: DenseVector,
+ weight: Double,
+ sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
+ BLAS.scal(1.0 / weight, mean)
+ BLAS.spr(-weight, mean, cov)
+ BLAS.scal(1.0 / weight, cov)
+ val newWeight = weight / sumWeights
+ val newGaussian = (mean, cov)
+ (newWeight, newGaussian)
+ }
+}
+
+/**
+ * ExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution
in the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian
(Normal) Distribution
+ * in the mixture. Note only upper triangular part of
the covariance
+ * matrix of each distribution is stored as dense
vector in order to
+ * reduce shuffled data size.
+ */
+private class ExpectationAggregator(
+ numFeatures: Int,
+ bcWeights: Broadcast[Array[Double]],
+ bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends
Serializable {
+
+ private val k: Int = bcWeights.value.length
+ private var totalCnt: Long = 0L
+ private var newLogLikelihood: Double = 0.0
+ private val newWeights: Array[Double] = new Array[Double](k)
+ private val newMeans: Array[DenseVector] = Array.fill(k)(
+ new DenseVector(Array.fill[Double](numFeatures)(0.0)))
+ private val newCovs: Array[DenseVector] = Array.fill(k)(
+ new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) /
2)(0.0)))
+
+ @transient private lazy val oldGaussians = {
+ bcGaussians.value.map { case (mean, covVec) =>
+ val cov = new DenseMatrix(numFeatures, numFeatures,
+ GaussianMixture.unpackUpperTriangularMatrix(numFeatures,
covVec.values))
+ new MultivariateGaussian(mean, cov)
+ }
+ }
+
+ def count: Long = totalCnt
+
+ def logLikelihood: Double = newLogLikelihood
+
+ def weights: Array[Double] = newWeights
+
+ def means: Array[DenseVector] = newMeans
+
+ def covs: Array[DenseVector] = newCovs
+
+ /**
+ * Add a new training instance to this ExpectationAggregator, update the
weights,
+ * means and covariances for each distributions, and update the log
likelihood.
+ *
+ * @param instance The instance of data point to be added.
+ * @return This ExpectationAggregator object.
+ */
+ def add(instance: Vector): this.type = {
+ val localWeights = bcWeights.value
+ val localOldGaussians = oldGaussians
+
+ val prob = new Array[Double](k)
+ var probSum = 0.0
+ var i = 0
+ while(i < k) {
--- End diff --
And in several other places
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]