Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15413#discussion_r94084799
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
    @@ -323,27 +326,95 @@ class GaussianMixture @Since("2.0.0") (
       @Since("2.0.0")
       def setSeed(value: Long): this.type = set(seed, value)
     
    +  /**
    +   * Number of samples per cluster to use when initializing Gaussians.
    +   */
    +  private val numSamples = 5
    +
       @Since("2.0.0")
       override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
         transformSchema(dataset.schema, logging = true)
    -    val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
    -      case Row(point: Vector) => OldVectors.fromML(point)
    -    }
     
    -    val instr = Instrumentation.create(this, rdd)
    +    val sc = dataset.sparkSession.sparkContext
    +    val numClusters = $(k)
    +
    +    val instances: RDD[Vector] = 
dataset.select(col($(featuresCol))).rdd.map {
    +      case Row(features: Vector) => features
    +    }.cache()
    +
    +    // Extract the number of features.
    +    val numFeatures = instances.first().size
    +
    +    val instr = Instrumentation.create(this, instances)
         instr.logParams(featuresCol, predictionCol, probabilityCol, k, 
maxIter, seed, tol)
     
    -    val algo = new MLlibGM()
    -      .setK($(k))
    -      .setMaxIterations($(maxIter))
    -      .setSeed($(seed))
    -      .setConvergenceTol($(tol))
    -    val parentModel = algo.run(rdd)
    -    val gaussians = parentModel.gaussians.map { case g =>
    -      new MultivariateGaussian(g.mu.asML, g.sigma.asML)
    +    val shouldDistributeGaussians = 
GaussianMixture.shouldDistributeGaussians(
    +      numClusters, numFeatures)
    +
    +    // TODO: SPARK-15785 Support users supplied initial GMM.
    +    val (weights, gaussians) = initRandom(instances, numClusters, 
numFeatures)
    +
    +    var logLikelihood = Double.MinValue
    +    var logLikelihoodPrev = 0.0
    +
    +    var iter = 0
    +    while (iter < $(maxIter) && math.abs(logLikelihood - 
logLikelihoodPrev) > $(tol)) {
    +
    +      val bcWeights = instances.sparkContext.broadcast(weights)
    +      val bcGaussians = instances.sparkContext.broadcast(gaussians)
    +
    +      // aggregate the cluster contribution for all sample points
    +      val sums = instances.treeAggregate(
    +        new ExpectationAggregator(numFeatures, bcWeights, bcGaussians))(
    +        seqOp = (c, v) => (c, v) match {
    +          case (aggregator, instance) => aggregator.add(instance)
    +        },
    +        combOp = (c1, c2) => (c1, c2) match {
    +          case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
    +        })
    +
    +      bcWeights.destroy(blocking = false)
    +      bcGaussians.destroy(blocking = false)
    +
    +      /*
    +         Create new distributions based on the partial assignments
    +         (often referred to as the "M" step in literature)
    +       */
    +      val sumWeights = sums.weights.sum
    +
    +      if (shouldDistributeGaussians) {
    +        val numPartitions = math.min(numClusters, 1024)
    +        val tuples = Seq.tabulate(numClusters) { i =>
    +          (sums.means(i), sums.covs(i), sums.weights(i))
    +        }
    +        val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case 
(mean, cov, weight) =>
    +          GaussianMixture.updateWeightsAndGaussians(mean, cov, weight, 
sumWeights)
    +        }.collect().unzip
    +        Array.copy(ws.toArray, 0, weights, 0, ws.length)
    +        Array.copy(gs.toArray, 0, gaussians, 0, gs.length)
    +      } else {
    +        var i = 0
    +        while (i < numClusters) {
    +          val (weight, gaussian) = 
GaussianMixture.updateWeightsAndGaussians(
    +            sums.means(i), sums.covs(i), sums.weights(i), sumWeights)
    +          weights(i) = weight
    +          gaussians(i) = gaussian
    +          i += 1
    +        }
    +      }
    +
    +      logLikelihoodPrev = logLikelihood   // current becomes previous
    +      logLikelihood = sums.logLikelihood  // this is the freshly computed 
log-likelihood
    +      iter += 1
         }
    -    val model = copyValues(new GaussianMixtureModel(uid, 
parentModel.weights, gaussians))
    -      .setParent(this)
    +
    +    val gaussianDists = gaussians.map { case (mean, covVec) =>
    +      val cov = new DenseMatrix(numFeatures, numFeatures,
    +        GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values))
    +      new MultivariateGaussian(mean, cov)
    +    }
    +
    +    val model = copyValues(new GaussianMixtureModel(uid, weights, 
gaussianDists)).setParent(this)
    --- End diff --
    
    Below here, we call logNumFeatures.  This isn't part of your PR, but could 
you move it earlier since numFeatures is available before running the algorithm?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to