Hi,

I was playing around with other k-means implementations in Scala/Spark in
order to test performances and get a better grasp on Spark.

Now, I made one similar to the one from the examples
(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala),
except that it's a bit less clever. Nevertheless, I expect a non-expert
scala/spark programmer to write similar code instead of that from the
example.

Here is how they compare: in the step of calculating the new centroids (this
is done by taking the average of all points belonging to the current
centroids - the main workhorse of the algo), where the *example algorithm*
adds the points of the same cluster and keeps track of the number of points
in each cluster in 1 step (by using reduceByKey and keeping a counter in the
reduce value):

val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
        
val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2,
y1 + y2)}

and then proceeds by dividing the sum of all points of a cluster by the
counted number of points in the cluster:

val newPoints = pointStats.map {pair => (pair._1, pair._2._1 /
pair._2._2)}.collectAsMap()

Afterwards the change of the new centroids is calculated in order to know
when to stop iterating:

tempDist = 0.0

for (i <- 0 until K) {
     tempDist += kPoints(i).squaredDist(newPoints(i))
}



*my algorithm *
(https://github.com/ticup/k-means-spark/blob/master/src/main/scala/k-means.scala)
is less clever, but more straightforward: it just groups all the points of
each cluster and then proceeds to calculate the average on those points and
adds the difference with the previous centroid to an accumulator:

 // accumulator for differences new centroids
 dist = sc.accumulator(0.0)

// calculate new centroids + add difference to old centroids
centroids = closest.groupByKey().map{case(i, points) =>
    val newCentroid = average(points)
    dist += centroids(i).squaredDist(newCentroid)
    newCentroid
}.collect()

with:

def average(points: Seq[Vector]) : Vector = {
    points.reduce(_+_) / points.length
}

So, the big differences are:

1. Use of accumulator
2. Do excessive work by not cleverly calculating the average
3. Accesses the centroids val from within the map


Now, why I'm here for, this version runs EXTREMELY slow and gets
outOfHeapMemory exceptions for data input that the original algorithm easily
solves in ~5seconds. I'm trying to pinpoint what exactly is causing this
huge difference. The use of an accumulator shouldn't really affect the
performance and it doesn't, because I tried it without the accumulator and
it stays as slow. Further, I expect the excessive work to slow down the
algorithm with a factor of 2 or something, but this is really a decrease in
factors of 10 or more.

Even with 1 worker and 1 core (thus no parallelism) the difference in speed
stays the same, so it's not because the averaging is not parallelised
properly, there must be something going on that is much more important.

Could someone give me pointers on what exactly is happening here? It can't
be because I'm just accessing the centroids value from within the closure?

Speed comparison:

The *slow algorithm*: 44 seconds to perform the map
14/04/19 13:03:15 INFO scheduler.DAGScheduler: Stage 3 (map at
k-means.scala:114) finished in 43.909 s


The *fast algorithm*: more or less the same operations (in 2 steps instead
of 1) in 2.2 seconds

14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at
k-means.scala:84) finished in 2.090 s
....
14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 2 (collectAsMap at
k-means.scala:86) finished in 0.117 s


Thanks in advance,
Tim.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to