[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-12 Thread srowen
Github user srowen closed the pull request at:

https://github.com/apache/spark/pull/15342


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-11 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82736118
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -531,6 +471,7 @@ object KMeans {
*   "k-means||". (default: "k-means||")
*/
   @Since("0.8.0")
+  @deprecated("Use train method without 'runs'", "2.1.0")
   def train(
--- End diff --

+1 @sethah 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-11 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82735096
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -499,13 +414,38 @@ object KMeans {
* @param data Training points as an `RDD` of `Vector` types.
* @param k Number of clusters to create.
* @param maxIterations Maximum number of iterations allowed.
+   * @param initializationMode The initialization algorithm. This can 
either be "random" or
+   *   "k-means||". (default: "k-means||")
+   * @param seed Random seed for cluster initialization. Default is to 
generate seed based
+   * on system time.
+   */
+  @Since("2.1.0")
+  def train(data: RDD[Vector],
--- End diff --

+1 @sethah 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-11 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82734529
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
+scal(1.0 / count, sum)
+val newCenter = new VectorWithNorm(sum)
+if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) 
> epsilon * epsilon) {
+  converged = 

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82691339
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -531,6 +471,7 @@ object KMeans {
*   "k-means||". (default: "k-means||")
*/
   @Since("0.8.0")
+  @deprecated("Use train method without 'runs'", "2.1.0")
   def train(
--- End diff --

This signature does not have a direct alternative without `runs`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82690396
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -499,13 +414,38 @@ object KMeans {
* @param data Training points as an `RDD` of `Vector` types.
* @param k Number of clusters to create.
* @param maxIterations Maximum number of iterations allowed.
+   * @param initializationMode The initialization algorithm. This can 
either be "random" or
+   *   "k-means||". (default: "k-means||")
+   * @param seed Random seed for cluster initialization. Default is to 
generate seed based
+   * on system time.
+   */
+  @Since("2.1.0")
+  def train(data: RDD[Vector],
--- End diff --

minor: style should match other train signatures


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82687576
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
--- End diff --

Yep, you're correct. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, 

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82682188
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
--- End diff --

I don't think it can be done the way you're suggesting; it's not just 
preference. You could just set it with a nice simple call `.forall` as you're 
suggesting, usually, but here we also need the side effect of visiting each 
element. To do both I think we 

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82681907
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
+scal(1.0 / count, sum)
+val newCenter = new VectorWithNorm(sum)
+if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) 
> epsilon * epsilon) {
+  converged = false
   

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82654945
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
+scal(1.0 / count, sum)
+val newCenter = new VectorWithNorm(sum)
+if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) 
> epsilon * epsilon) {
+  converged = false
   

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82653577
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
--- End diff --

The logic is the same, yes, but it seems really strange to set something to 
false, then each iteration set it to true and then set it back false if some 
condition. Why not leave it false and change to true if convergence criteria is 
met? This is basically a 

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82652315
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
--- End diff --

Unless I'm overlooking some obviously nicer expression, I think the loop is 
going to work the same either way: you have to assume you terminate unless a 
distance proves otherwise, per iteration.


---
If your project is set up for it, you can reply to this 

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82650874
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
--- End diff --

In general `while` is faster than `foreach` (creating and calling an 
anonymous function), but I'd be surprised if it affected performance here 
because we are only running this once per iteration 

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82649893
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
+scal(1.0 / count, sum)
+val newCenter = new VectorWithNorm(sum)
+if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) 
> epsilon * epsilon) {
+  converged = false
   

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82649829
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
+scal(1.0 / count, sum)
+val newCenter = new VectorWithNorm(sum)
+if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) 
> epsilon * epsilon) {
+  converged = false
   

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82648980
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
--- End diff --

Why is that? I'm aware that Scala `for` comprehensions can desugar into 
something surprisingly expensive, but this seems clearer and about the same as 
a `while`


---
If your project is set up for 

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82648993
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -43,21 +43,21 @@ import org.apache.spark.util.random.XORShiftRandom
 class KMeans private (
 private var k: Int,
 private var maxIterations: Int,
-private var runs: Int,
 private var initializationMode: String,
 private var initializationSteps: Int,
 private var epsilon: Double,
 private var seed: Long) extends Serializable with Logging {
 
   /**
-   * Constructs a KMeans instance with default parameters: {k: 2, 
maxIterations: 20, runs: 1,
+   * Constructs a KMeans instance with default parameters: {k: 2, 
maxIterations: 20,
* initializationMode: "k-means||", initializationSteps: 2, epsilon: 
1e-4, seed: random}.
*/
   @Since("0.8.0")
-  def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 2, 1e-4, 
Utils.random.nextLong())
+  def this() = this(2, 20, KMeans.K_MEANS_PARALLEL, 2, 1e-4, 
Utils.random.nextLong())
 
   /**
-   * Number of clusters to create (k).
+   * Number of clusters to create (k). Note that if the input has fewer 
than k elements,
+   * then it's possible that fewer than k clusters are created.
--- End diff --

Ooops, right


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82646051
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -558,6 +475,7 @@ object KMeans {
* Trains a k-means model using specified parameters and the default 
values for unspecified.
*/
   @Since("0.8.0")
+  @deprecated("Use train method without 'runs'", "2.1.0")
--- End diff --

I think we should add them for completeness, and deprecate all overloads 
using `runs`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82634889
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
+scal(1.0 / count, sum)
+val newCenter = new VectorWithNorm(sum)
+if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) 
> epsilon * epsilon) {
+  converged = false
   

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82625386
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
--- End diff --

Why don't we just leave converged false, and only change it to true inside 
the foreach?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and 

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82633045
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
+scal(1.0 / count, sum)
+val newCenter = new VectorWithNorm(sum)
+if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) 
> epsilon * epsilon) {
+  converged = false
   

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82625743
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
+scal(1.0 / count, sum)
+val newCenter = new VectorWithNorm(sum)
+if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) 
> epsilon * epsilon) {
+  converged = 

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82626968
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
--- End diff --

I think ```changed``` would be more intuitive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled 

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82627280
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
--- End diff --

Compared with the original code, ```foreach``` may slower than ```while``` 
loop if you have a large ```k```.


---
If your project is set up for it, you can reply to this email and have your

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82624159
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -258,149 +252,106 @@ class KMeans private (
 }
 }
 val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
-logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
-  " seconds.")
+logInfo(f"Initialization with $initializationMode took 
$initTimeInSeconds%.3f seconds.")
 
-val active = Array.fill(numRuns)(true)
-val costs = Array.fill(numRuns)(0.0)
-
-var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+var converged = false
+var cost = 0.0
 var iteration = 0
 
 val iterationStartTime = System.nanoTime()
 
-instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
+instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
-// Execute iterations of Lloyd's algorithm until all runs have 
converged
-while (iteration < maxIterations && !activeRuns.isEmpty) {
-  type WeightedPoint = (Vector, Long)
-  def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
-axpy(1.0, x._1, y._1)
-(y._1, x._2 + y._2)
-  }
-
-  val activeCenters = activeRuns.map(r => centers(r)).toArray
-  val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
-
-  val bcActiveCenters = sc.broadcast(activeCenters)
+// Execute iterations of Lloyd's algorithm until converged
+while (iteration < maxIterations && !converged) {
+  val costAccum = sc.doubleAccumulator
+  val bcCenters = sc.broadcast(centers)
 
   // Find the sum and count of points mapping to each center
   val totalContribs = data.mapPartitions { points =>
-val thisActiveCenters = bcActiveCenters.value
-val runs = thisActiveCenters.length
-val k = thisActiveCenters(0).length
-val dims = thisActiveCenters(0)(0).vector.size
+val thisCenters = bcCenters.value
+val dims = thisCenters.head.vector.size
 
-val sums = Array.fill(runs, k)(Vectors.zeros(dims))
-val counts = Array.fill(runs, k)(0L)
+val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
+val counts = Array.fill(thisCenters.length)(0L)
 
 points.foreach { point =>
-  (0 until runs).foreach { i =>
-val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
-costAccums(i).add(cost)
-val sum = sums(i)(bestCenter)
-axpy(1.0, point.vector, sum)
-counts(i)(bestCenter) += 1
-  }
+  val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
+  costAccum.add(cost)
+  val sum = sums(bestCenter)
+  axpy(1.0, point.vector, sum)
+  counts(bestCenter) += 1
 }
 
-val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
-  ((i, j), (sums(i)(j), counts(i)(j)))
-}
-contribs.iterator
-  }.reduceByKey(mergeContribs).collectAsMap()
-
-  bcActiveCenters.destroy(blocking = false)
-
-  // Update the cluster centers and costs for each active run
-  for ((run, i) <- activeRuns.zipWithIndex) {
-var changed = false
-var j = 0
-while (j < k) {
-  val (sum, count) = totalContribs((i, j))
-  if (count != 0) {
-scal(1.0 / count, sum)
-val newCenter = new VectorWithNorm(sum)
-if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
-  changed = true
-}
-centers(run)(j) = newCenter
-  }
-  j += 1
-}
-if (!changed) {
-  active(run) = false
-  logInfo("Run " + run + " finished in " + (iteration + 1) + " 
iterations")
+counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), 
counts(j.iterator
+  }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
+axpy(1.0, sum2, sum1)
+(sum1, count1 + count2)
+  }.collectAsMap()
+
+  bcCenters.destroy(blocking = false)
+
+  // Update the cluster centers and costs
+  converged = true
+  totalContribs.foreach { case (j, (sum, count)) =>
+scal(1.0 / count, sum)
+val newCenter = new VectorWithNorm(sum)
+if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) 
> epsilon * epsilon) {
+  converged = 

[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82625915
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -409,74 +360,39 @@ class KMeans private (
   bcNewCentersList += bcNewCenters
   val preCosts = costs
   costs = data.zip(preCosts).map { case (point, cost) =>
-  Array.tabulate(runs) { r =>
-math.min(KMeans.pointCost(bcNewCenters.value(r), point), 
cost(r))
-  }
-}.persist(StorageLevel.MEMORY_AND_DISK)
-  val sumCosts = costs
-.aggregate(new Array[Double](runs))(
-  seqOp = (s, v) => {
-// s += v
-var r = 0
-while (r < runs) {
-  s(r) += v(r)
-  r += 1
-}
-s
-  },
-  combOp = (s0, s1) => {
-// s0 += s1
-var r = 0
-while (r < runs) {
-  s0(r) += s1(r)
-  r += 1
-}
-s0
-  }
-)
+math.min(KMeans.pointCost(bcNewCenters.value, point), cost)
+  }.persist(StorageLevel.MEMORY_AND_DISK)
+  val sumCosts = costs.sum()
 
   bcNewCenters.unpersist(blocking = false)
   preCosts.unpersist(blocking = false)
 
-  val chosen = data.zip(costs).mapPartitionsWithIndex { (index, 
pointsWithCosts) =>
+  val chosen = data.zip(costs).mapPartitionsWithIndex { (index, 
pointCosts) =>
 val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
-pointsWithCosts.flatMap { case (p, c) =>
-  val rs = (0 until runs).filter { r =>
-rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
-  }
-  if (rs.nonEmpty) Some((p, rs)) else None
-}
+pointCosts.filter { case (_, c) => rand.nextDouble() < 2.0 * c * k 
/ sumCosts }.map(_._1)
   }.collect()
-  mergeNewCenters()
-  chosen.foreach { case (p, rs) =>
-rs.foreach(newCenters(_) += p.toDense)
-  }
+  newCenters = chosen.map(_.toDense)
--- End diff --

Ditto.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...

2016-10-10 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/15342#discussion_r82619472
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -43,21 +43,21 @@ import org.apache.spark.util.random.XORShiftRandom
 class KMeans private (
 private var k: Int,
 private var maxIterations: Int,
-private var runs: Int,
 private var initializationMode: String,
 private var initializationSteps: Int,
 private var epsilon: Double,
 private var seed: Long) extends Serializable with Logging {
 
   /**
-   * Constructs a KMeans instance with default parameters: {k: 2, 
maxIterations: 20, runs: 1,
+   * Constructs a KMeans instance with default parameters: {k: 2, 
maxIterations: 20,
* initializationMode: "k-means||", initializationSteps: 2, epsilon: 
1e-4, seed: random}.
*/
   @Since("0.8.0")
-  def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 2, 1e-4, 
Utils.random.nextLong())
+  def this() = this(2, 20, KMeans.K_MEANS_PARALLEL, 2, 1e-4, 
Utils.random.nextLong())
 
   /**
-   * Number of clusters to create (k).
+   * Number of clusters to create (k). Note that if the input has fewer 
than k elements,
+   * then it's possible that fewer than k clusters are created.
--- End diff --

If we back out change to avoid duplicate centroids, does this annotation be 
invalid?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org