[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user srowen closed the pull request at: https://github.com/apache/spark/pull/15342 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82736118 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -531,6 +471,7 @@ object KMeans { * "k-means||". (default: "k-means||") */ @Since("0.8.0") + @deprecated("Use train method without 'runs'", "2.1.0") def train( --- End diff -- +1 @sethah --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82735096 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -499,13 +414,38 @@ object KMeans { * @param data Training points as an `RDD` of `Vector` types. * @param k Number of clusters to create. * @param maxIterations Maximum number of iterations allowed. + * @param initializationMode The initialization algorithm. This can either be "random" or + * "k-means||". (default: "k-means||") + * @param seed Random seed for cluster initialization. Default is to generate seed based + * on system time. + */ + @Since("2.1.0") + def train(data: RDD[Vector], --- End diff -- +1 @sethah --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82734529 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => +scal(1.0 / count, sum) +val newCenter = new VectorWithNorm(sum) +if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { + converged =
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82691339 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -531,6 +471,7 @@ object KMeans { * "k-means||". (default: "k-means||") */ @Since("0.8.0") + @deprecated("Use train method without 'runs'", "2.1.0") def train( --- End diff -- This signature does not have a direct alternative without `runs`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82690396 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -499,13 +414,38 @@ object KMeans { * @param data Training points as an `RDD` of `Vector` types. * @param k Number of clusters to create. * @param maxIterations Maximum number of iterations allowed. + * @param initializationMode The initialization algorithm. This can either be "random" or + * "k-means||". (default: "k-means||") + * @param seed Random seed for cluster initialization. Default is to generate seed based + * on system time. + */ + @Since("2.1.0") + def train(data: RDD[Vector], --- End diff -- minor: style should match other train signatures --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82687576 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true --- End diff -- Yep, you're correct. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working,
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82682188 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true --- End diff -- I don't think it can be done the way you're suggesting; it's not just preference. You could just set it with a nice simple call `.forall` as you're suggesting, usually, but here we also need the side effect of visiting each element. To do both I think we
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82681907 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => +scal(1.0 / count, sum) +val newCenter = new VectorWithNorm(sum) +if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { + converged = false
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82654945 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => +scal(1.0 / count, sum) +val newCenter = new VectorWithNorm(sum) +if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { + converged = false
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82653577 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true --- End diff -- The logic is the same, yes, but it seems really strange to set something to false, then each iteration set it to true and then set it back false if some condition. Why not leave it false and change to true if convergence criteria is met? This is basically a
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82652315 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true --- End diff -- Unless I'm overlooking some obviously nicer expression, I think the loop is going to work the same either way: you have to assume you terminate unless a distance proves otherwise, per iteration. --- If your project is set up for it, you can reply to this
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82650874 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => --- End diff -- In general `while` is faster than `foreach` (creating and calling an anonymous function), but I'd be surprised if it affected performance here because we are only running this once per iteration
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82649893 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => +scal(1.0 / count, sum) +val newCenter = new VectorWithNorm(sum) +if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { + converged = false
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82649829 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => +scal(1.0 / count, sum) +val newCenter = new VectorWithNorm(sum) +if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { + converged = false
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82648980 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => --- End diff -- Why is that? I'm aware that Scala `for` comprehensions can desugar into something surprisingly expensive, but this seems clearer and about the same as a `while` --- If your project is set up for
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82648993 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -43,21 +43,21 @@ import org.apache.spark.util.random.XORShiftRandom class KMeans private ( private var k: Int, private var maxIterations: Int, -private var runs: Int, private var initializationMode: String, private var initializationSteps: Int, private var epsilon: Double, private var seed: Long) extends Serializable with Logging { /** - * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, + * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, * initializationMode: "k-means||", initializationSteps: 2, epsilon: 1e-4, seed: random}. */ @Since("0.8.0") - def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong()) + def this() = this(2, 20, KMeans.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong()) /** - * Number of clusters to create (k). + * Number of clusters to create (k). Note that if the input has fewer than k elements, + * then it's possible that fewer than k clusters are created. --- End diff -- Ooops, right --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82646051 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -558,6 +475,7 @@ object KMeans { * Trains a k-means model using specified parameters and the default values for unspecified. */ @Since("0.8.0") + @deprecated("Use train method without 'runs'", "2.1.0") --- End diff -- I think we should add them for completeness, and deprecate all overloads using `runs`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82634889 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => +scal(1.0 / count, sum) +val newCenter = new VectorWithNorm(sum) +if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { + converged = false
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82625386 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true --- End diff -- Why don't we just leave converged false, and only change it to true inside the foreach? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82633045 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => +scal(1.0 / count, sum) +val newCenter = new VectorWithNorm(sum) +if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { + converged = false
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82625743 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => +scal(1.0 / count, sum) +val newCenter = new VectorWithNorm(sum) +if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { + converged =
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82626968 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true --- End diff -- I think ```changed``` would be more intuitive. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82627280 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => --- End diff -- Compared with the original code, ```foreach``` may slower than ```while``` loop if you have a large ```k```. --- If your project is set up for it, you can reply to this email and have your
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82624159 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -258,149 +252,106 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 -logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") +logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") -val active = Array.fill(numRuns)(true) -val costs = Array.fill(numRuns)(0.0) - -var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) +var converged = false +var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() -instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) +instr.foreach(_.logNumFeatures(centers.head.vector.size)) -// Execute iterations of Lloyd's algorithm until all runs have converged -while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { -axpy(1.0, x._1, y._1) -(y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) - - val bcActiveCenters = sc.broadcast(activeCenters) +// Execute iterations of Lloyd's algorithm until converged +while (iteration < maxIterations && !converged) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => -val thisActiveCenters = bcActiveCenters.value -val runs = thisActiveCenters.length -val k = thisActiveCenters(0).length -val dims = thisActiveCenters(0)(0).vector.size +val thisCenters = bcCenters.value +val dims = thisCenters.head.vector.size -val sums = Array.fill(runs, k)(Vectors.zeros(dims)) -val counts = Array.fill(runs, k)(0L) +val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) +val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => -val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) -costAccums(i).add(cost) -val sum = sums(i)(bestCenter) -axpy(1.0, point.vector, sum) -counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } -val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) -} -contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { -var changed = false -var j = 0 -while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { -scal(1.0 / count, sum) -val newCenter = new VectorWithNorm(sum) -if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true -} -centers(run)(j) = newCenter - } - j += 1 -} -if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") +counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j.iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => +axpy(1.0, sum2, sum1) +(sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + converged = true + totalContribs.foreach { case (j, (sum, count)) => +scal(1.0 / count, sum) +val newCenter = new VectorWithNorm(sum) +if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { + converged =
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82625915 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -409,74 +360,39 @@ class KMeans private ( bcNewCentersList += bcNewCenters val preCosts = costs costs = data.zip(preCosts).map { case (point, cost) => - Array.tabulate(runs) { r => -math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) - } -}.persist(StorageLevel.MEMORY_AND_DISK) - val sumCosts = costs -.aggregate(new Array[Double](runs))( - seqOp = (s, v) => { -// s += v -var r = 0 -while (r < runs) { - s(r) += v(r) - r += 1 -} -s - }, - combOp = (s0, s1) => { -// s0 += s1 -var r = 0 -while (r < runs) { - s0(r) += s1(r) - r += 1 -} -s0 - } -) +math.min(KMeans.pointCost(bcNewCenters.value, point), cost) + }.persist(StorageLevel.MEMORY_AND_DISK) + val sumCosts = costs.sum() bcNewCenters.unpersist(blocking = false) preCosts.unpersist(blocking = false) - val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) => + val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointCosts) => val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) -pointsWithCosts.flatMap { case (p, c) => - val rs = (0 until runs).filter { r => -rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r) - } - if (rs.nonEmpty) Some((p, rs)) else None -} +pointCosts.filter { case (_, c) => rand.nextDouble() < 2.0 * c * k / sumCosts }.map(_._1) }.collect() - mergeNewCenters() - chosen.foreach { case (p, rs) => -rs.foreach(newCenters(_) += p.toDense) - } + newCenters = chosen.map(_.toDense) --- End diff -- Ditto. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15342: [SPARK-11560] [MLLIB] Optimize KMeans implementat...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/15342#discussion_r82619472 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala --- @@ -43,21 +43,21 @@ import org.apache.spark.util.random.XORShiftRandom class KMeans private ( private var k: Int, private var maxIterations: Int, -private var runs: Int, private var initializationMode: String, private var initializationSteps: Int, private var epsilon: Double, private var seed: Long) extends Serializable with Logging { /** - * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, + * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, * initializationMode: "k-means||", initializationSteps: 2, epsilon: 1e-4, seed: random}. */ @Since("0.8.0") - def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong()) + def this() = this(2, 20, KMeans.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong()) /** - * Number of clusters to create (k). + * Number of clusters to create (k). Note that if the input has fewer than k elements, + * then it's possible that fewer than k clusters are created. --- End diff -- If we back out change to avoid duplicate centroids, does this annotation be invalid? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org