Github user avulanov commented on a diff in the pull request:
https://github.com/apache/spark/pull/10806#discussion_r54823716
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
@@ -224,142 +261,133 @@ class KMeans private (
/**
* Implementation of K-Means algorithm.
*/
- private def runAlgorithm(data: RDD[VectorWithNorm]): KMeansModel = {
+ private def runAlgorithm(
+ data: RDD[(DenseMatrix, DenseMatrix)],
+ centers: Array[VectorWithNorm]): KMeansModel = {
val sc = data.sparkContext
- val initStartTime = System.nanoTime()
-
- // Only one run is allowed when initialModel is given
- val numRuns = if (initialModel.nonEmpty) {
- if (runs > 1) logWarning("Ignoring runs; one run is allowed when
initialModel is given.")
- 1
- } else {
- runs
- }
-
- val centers = initialModel match {
- case Some(kMeansCenters) => {
- Array(kMeansCenters.clusterCenters.map(s => new VectorWithNorm(s)))
- }
- case None => {
- if (initializationMode == KMeans.RANDOM) {
- initRandom(data)
- } else {
- initKMeansParallel(data)
- }
- }
- }
- val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
- logInfo(s"Initialization with $initializationMode took " +
"%.3f".format(initTimeInSeconds) +
- " seconds.")
-
- val active = Array.fill(numRuns)(true)
- val costs = Array.fill(numRuns)(0.0)
-
- var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
+ var done = false
+ var costs = 0.0
var iteration = 0
-
val iterationStartTime = System.nanoTime()
- // Execute iterations of Lloyd's algorithm until all runs have
converged
- while (iteration < maxIterations && !activeRuns.isEmpty) {
+ // Execute Lloyd's algorithm until converged or reached the max number
of iterations
+ while (iteration < maxIterations && !done ) {
type WeightedPoint = (Vector, Long)
def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint
= {
axpy(1.0, x._1, y._1)
(y._1, x._2 + y._2)
}
- val activeCenters = activeRuns.map(r => centers(r)).toArray
- val costAccums = activeRuns.map(_ => sc.accumulator(0.0))
+ val costAccums = sc.accumulator(0.0)
+ val bcCenters = sc.broadcast(centers)
- val bcActiveCenters = sc.broadcast(activeCenters)
// Find the sum and count of points mapping to each center
- val totalContribs = data.mapPartitions { points =>
- val thisActiveCenters = bcActiveCenters.value
- val runs = thisActiveCenters.length
- val k = thisActiveCenters(0).length
- val dims = thisActiveCenters(0)(0).vector.size
+ val totalContribs = data.flatMap { case (pointMatrix,
pointsNormMatrix) =>
+ val thisCenters = bcCenters.value
+ val k = thisCenters.length
+ val dims = thisCenters(0).vector.size
+
+ val sums = Array.fill(k)(Vectors.zeros(dims))
+ val counts = Array.fill(k)(0L)
+
+ val numRows = pointMatrix.numRows
+
+ // Construct centers matrix
+ val centersArray = new Array[Double](k * dims)
+ val centersNormArray = new Array[Double](k)
+ var i = 0
+ thisCenters.foreach { center =>
+ System.arraycopy(center.vector.toArray, 0, centersArray, i *
dims, dims)
+ centersNormArray(i) = math.pow(center.norm, 2.0)
+ i += 1
+ }
+ val centerMatrix = new DenseMatrix(dims, k, centersArray)
+
+ val a2b2 = new Array[Double](k * numRows)
+ for (i <- 0 until k; j <- 0 until numRows) {
+ a2b2(i * numRows + j) = pointsNormMatrix.values(j) +
centersNormArray(i)
+ }
- val sums = Array.fill(runs, k)(Vectors.zeros(dims))
- val counts = Array.fill(runs, k)(0L)
+ val distanceMatrix = new DenseMatrix(numRows, k, a2b2)
+ gemm(-2.0, pointMatrix, centerMatrix, 1.0, distanceMatrix)
- points.foreach { point =>
- (0 until runs).foreach { i =>
- val (bestCenter, cost) =
KMeans.findClosest(thisActiveCenters(i), point)
- costAccums(i) += cost
- val sum = sums(i)(bestCenter)
- axpy(1.0, point.vector, sum)
- counts(i)(bestCenter) += 1
+ val original = pointMatrix.values.grouped(dims).toArray
+
+ for (i <- 0 until numRows) {
+ var minCost = Double.PositiveInfinity
+ var min = -1
+ for (j <- 0 until k) {
+ val distance = distanceMatrix.values(i + j * numRows)
+ if (distance < minCost) {
+ minCost = distance
+ min = j
+ }
}
+ costAccums += minCost
+ val sum = sums(min)
+ val value = original(i)
+ axpy(1.0, Vectors.dense(value), sum)
--- End diff --
That would be problematic if `pointMatrix.values` will be accessed
directly, because `DenseVector` in Spark does not support offsets. One option
would be to use Breeze `DenseVector`. It supports in-place summation.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]