Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15342#discussion_r82681907
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
    @@ -258,149 +252,106 @@ class KMeans private (
             }
         }
         val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
    -    logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
    -      " seconds.")
    +    logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
     
    -    val active = Array.fill(numRuns)(true)
    -    val costs = Array.fill(numRuns)(0.0)
    -
    -    var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
    +    var converged = false
    +    var cost = 0.0
         var iteration = 0
     
         val iterationStartTime = System.nanoTime()
     
    -    instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
    +    instr.foreach(_.logNumFeatures(centers.head.vector.size))
     
    -    // Execute iterations of Lloyd's algorithm until all runs have 
converged
    -    while (iteration < maxIterations && !activeRuns.isEmpty) {
    -      type WeightedPoint = (Vector, Long)
    -      def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
    -        axpy(1.0, x._1, y._1)
    -        (y._1, x._2 + y._2)
    -      }
    -
    -      val activeCenters = activeRuns.map(r => centers(r)).toArray
    -      val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
    -
    -      val bcActiveCenters = sc.broadcast(activeCenters)
    +    // Execute iterations of Lloyd's algorithm until converged
    +    while (iteration < maxIterations && !converged) {
    +      val costAccum = sc.doubleAccumulator
    +      val bcCenters = sc.broadcast(centers)
     
           // Find the sum and count of points mapping to each center
           val totalContribs = data.mapPartitions { points =>
    -        val thisActiveCenters = bcActiveCenters.value
    -        val runs = thisActiveCenters.length
    -        val k = thisActiveCenters(0).length
    -        val dims = thisActiveCenters(0)(0).vector.size
    +        val thisCenters = bcCenters.value
    +        val dims = thisCenters.head.vector.size
     
    -        val sums = Array.fill(runs, k)(Vectors.zeros(dims))
    -        val counts = Array.fill(runs, k)(0L)
    +        val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
    +        val counts = Array.fill(thisCenters.length)(0L)
     
             points.foreach { point =>
    -          (0 until runs).foreach { i =>
    -            val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
    -            costAccums(i).add(cost)
    -            val sum = sums(i)(bestCenter)
    -            axpy(1.0, point.vector, sum)
    -            counts(i)(bestCenter) += 1
    -          }
    +          val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
    +          costAccum.add(cost)
    +          val sum = sums(bestCenter)
    +          axpy(1.0, point.vector, sum)
    +          counts(bestCenter) += 1
             }
     
    -        val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
    -          ((i, j), (sums(i)(j), counts(i)(j)))
    -        }
    -        contribs.iterator
    -      }.reduceByKey(mergeContribs).collectAsMap()
    -
    -      bcActiveCenters.destroy(blocking = false)
    -
    -      // Update the cluster centers and costs for each active run
    -      for ((run, i) <- activeRuns.zipWithIndex) {
    -        var changed = false
    -        var j = 0
    -        while (j < k) {
    -          val (sum, count) = totalContribs((i, j))
    -          if (count != 0) {
    -            scal(1.0 / count, sum)
    -            val newCenter = new VectorWithNorm(sum)
    -            if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
    -              changed = true
    -            }
    -            centers(run)(j) = newCenter
    -          }
    -          j += 1
    -        }
    -        if (!changed) {
    -          active(run) = false
    -          logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
    +        counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j)))).iterator
    +      }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
    +        axpy(1.0, sum2, sum1)
    +        (sum1, count1 + count2)
    +      }.collectAsMap()
    +
    +      bcCenters.destroy(blocking = false)
    +
    +      // Update the cluster centers and costs
    +      converged = true
    +      totalContribs.foreach { case (j, (sum, count)) =>
    +        scal(1.0 / count, sum)
    +        val newCenter = new VectorWithNorm(sum)
    +        if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) 
> epsilon * epsilon) {
    +          converged = false
             }
    -        costs(run) = costAccums(i).value
    +        centers(j) = newCenter
           }
     
    -      activeRuns = activeRuns.filter(active(_))
    +      cost = costAccum.value
           iteration += 1
         }
     
         val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) 
/ 1e9
    -    logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + 
" seconds.")
    +    logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.")
     
         if (iteration == maxIterations) {
           logInfo(s"KMeans reached the max number of iterations: 
$maxIterations.")
         } else {
           logInfo(s"KMeans converged in $iteration iterations.")
         }
     
    -    val (minCost, bestRun) = costs.zipWithIndex.min
    +    logInfo(s"The cost is $cost.")
     
    -    logInfo(s"The cost for the best run is $minCost.")
    -
    -    new KMeansModel(centers(bestRun).map(_.vector))
    +    new KMeansModel(centers.map(_.vector))
       }
     
       /**
    -   * Initialize `runs` sets of cluster centers at random.
    +   * Initialize a set of cluster centers at random.
        */
    -  private def initRandom(data: RDD[VectorWithNorm])
    -  : Array[Array[VectorWithNorm]] = {
    -    // Sample all the cluster centers in one pass to avoid repeated scans
    -    val sample = data.takeSample(true, runs * k, new 
XORShiftRandom(this.seed).nextInt()).toSeq
    -    Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v =>
    -      new VectorWithNorm(Vectors.dense(v.vector.toArray), v.norm)
    -    }.toArray)
    +  private def initRandom(data: RDD[VectorWithNorm]): Array[VectorWithNorm] 
= {
    +    data.takeSample(true, k, new 
XORShiftRandom(this.seed).nextInt()).map(_.toDense)
       }
     
       /**
    -   * Initialize `runs` sets of cluster centers using the k-means|| 
algorithm by Bahmani et al.
    +   * Initialize a set of cluster centers using the k-means|| algorithm by 
Bahmani et al.
        * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of 
k-means++ that tries
    -   * to find with dissimilar cluster centers by starting with a random 
center and then doing
    +   * to find dissimilar cluster centers by starting with a random center 
and then doing
        * passes where more centers are chosen with probability proportional to 
their squared distance
        * to the current cluster set. It results in a provable approximation to 
an optimal clustering.
        *
        * The original paper can be found at 
http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
        */
    -  private def initKMeansParallel(data: RDD[VectorWithNorm])
    -  : Array[Array[VectorWithNorm]] = {
    +  private def initKMeansParallel(data: RDD[VectorWithNorm]): 
Array[VectorWithNorm] = {
         // Initialize empty centers and point costs.
    -    val centers = Array.tabulate(runs)(r => 
ArrayBuffer.empty[VectorWithNorm])
    -    var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity))
    +    var costs = data.map(_ => Double.PositiveInfinity)
     
    -    // Initialize each run's first center to a random point.
    +    // Initialize the first center to a random point.
         val seed = new XORShiftRandom(this.seed).nextInt()
    -    val sample = data.takeSample(true, runs, seed).toSeq
    +    val sample = data.takeSample(false, 1, seed)
         // Could be empty if data is empty; fail with a better message early:
    -    require(sample.size >= runs, s"Required $runs samples but got 
${sample.size} from $data")
    -    val newCenters = Array.tabulate(runs)(r => 
ArrayBuffer(sample(r).toDense))
    -
    -    /** Merges new centers to centers. */
    -    def mergeNewCenters(): Unit = {
    -      var r = 0
    -      while (r < runs) {
    -        centers(r) ++= newCenters(r)
    -        newCenters(r).clear()
    -        r += 1
    -      }
    -    }
    +    require(sample.nonEmpty, s"No samples available from $data")
     
    -    // On each step, sample 2 * k points on average for each run with 
probability proportional
    -    // to their squared distance from that run's centers. Note that only 
distances between points
    +    val centers = ArrayBuffer[VectorWithNorm]()
    +    var newCenters = Seq(sample.head.toDense)
    --- End diff --
    
    It's a Seq but yeah no specific preference. Where do you see that 
optimization BTW? I just see it implemented for `TraversableOnce`, and that's 
what my IDE says it calls even when given an `Array`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to