Hello,

I'm trying to run streaming k-means in the hadoop/non-local mode with
reduce_streaming_k_means option enabled.

The map stage completes successfully but the reduce stage fails to do
so and times out. I'm basically trying to understand why. I have quite
a few questions, but first here's a part of the progress update >>

15/10/07 15:37:59 INFO mapreduce.Job:  map 62% reduce 0%
15/10/07 15:38:47 INFO mapreduce.Job:  map 63% reduce 0%
15/10/07 15:39:20 INFO mapreduce.Job:  map 64% reduce 0%
15/10/07 15:39:41 INFO mapreduce.Job:  map 65% reduce 0%
15/10/07 15:40:05 INFO mapreduce.Job:  map 66% reduce 0%
15/10/07 15:40:07 INFO mapreduce.Job:  map 74% reduce 0%
15/10/07 15:40:10 INFO mapreduce.Job:  map 82% reduce 0%
15/10/07 15:40:17 INFO mapreduce.Job:  map 83% reduce 0%
15/10/07 15:40:19 INFO mapreduce.Job:  map 83% reduce 17%
15/10/07 15:40:37 INFO mapreduce.Job:  map 91% reduce 17%
15/10/07 15:40:40 INFO mapreduce.Job:  map 91% reduce 25%
15/10/07 15:40:57 INFO mapreduce.Job:  map 92% reduce 25%
15/10/07 15:41:15 INFO mapreduce.Job:  map 100% reduce 25%
15/10/07 15:41:16 INFO mapreduce.Job:  map 100% reduce 67%
15/10/07 15:41:28 INFO mapreduce.Job:  map 100% reduce 68%
15/10/07 15:42:39 INFO mapreduce.Job:  map 100% reduce 69%
15/10/07 15:45:29 INFO mapreduce.Job:  map 100% reduce 70%
15/10/07 15:50:32 INFO mapreduce.Job:  map 100% reduce 71%
15/10/07 15:57:53 INFO mapreduce.Job:  map 100% reduce 72%
15/10/07 16:08:39 INFO mapreduce.Job:  map 100% reduce 73%
15/10/07 16:23:16 INFO mapreduce.Job:  map 100% reduce 74%
15/10/07 16:40:17 INFO mapreduce.Job:  map 100% reduce 75%
15/10/07 16:59:36 INFO mapreduce.Job:  map 100% reduce 76%
15/10/07 17:22:25 INFO mapreduce.Job:  map 100% reduce 77%
15/10/07 17:51:01 INFO mapreduce.Job: Task Id :
attempt_1443139520415_1449_r_000000_0, Status : FAILED
AttemptID:attempt_1443139520415_1449_r_000000_0 Timed out after 600 secs
15/10/07 17:51:03 INFO mapreduce.Job:  map 100% reduce 0%
15/10/07 17:51:18 INFO mapreduce.Job:  map 100% reduce 67%
15/10/07 17:51:32 INFO mapreduce.Job:  map 100% reduce 68%
15/10/07 17:52:45 INFO mapreduce.Job:  map 100% reduce 69%
15/10/07 17:55:47 INFO mapreduce.Job:  map 100% reduce 70%
15/10/07 18:01:03 INFO mapreduce.Job:  map 100% reduce 71%
15/10/07 18:08:43 INFO mapreduce.Job:  map 100% reduce 72%

It appears that the reduce task container fails to report on progress
status within the configured timeout interval. Also, before the reduce
job times out, each progress update took longer than the previous one.

My understanding is that without the -rskm switch, the map stage is a
streaming k-means step and the reduce stage is the ball k-means step.
With the switch however, I understand that the reduce stage consists
of a streaming k-means step as well, wherein the somewhat large number
of centroids from the map stage are clustered again to get a smaller
set of centroids for the ball k-means step. Is this so?

My command line invocation has these options specified:

-k 450 -km 2500 -e 0.8 -mi 10 -tf 0.8 -testp 0.2 -nbkm 5 -dm
org.apache.mahout.common.distance.TanimotoDistanceMeasure -sc
org.apache.mahout.math.neighborhood.LocalitySensitiveHashSearch -s 6
-rskm

My input tf-vectors are sparse vectors (binary-valued with at most 6
non-zero dimensions; centroids of course, over the course of
iterations, don't remain as sparse as inputs). There are just under
200,000 input vectors.

Would be great if you could help me understand what's causing the job
to timeout? Based on the invocation switches above, do we know how
each of the following aspects impacts the progress _and_ progress
reporting of the reduce stage >>

a) choice of searcherClass and searchSize. I'm using LSH Search with a
size of 6.

b) choice of testProbability and number of ball k-means runs. I'm
using 0.2 and 5 respectively. Does specifying more than one run
influence

c) choice of maximum number of iterations to run for ball k-means. I
have set it to 10.

d) choice of initial estimated distance cutoff. I have set it to 0.8.
I believe this is a parameter for streaming step and not ball step?

e) choice of trim fraction. i have set it to 0.8. My understanding is
that increasing the trim-fraction implies more vectors are used for
computing a centroid.

f) choice of distance measure. I know I'm using Tanimoto and that the
streaming k-means papers referenced by Mahout focus on Euclidean
distance. But from implementation point-of-view, are there
implications for memory usage and computation if one chooses to use a
particular distance metric (especially given that my vectors are
sparse and my metric is non-Euclidean)?

g) finally, the choice of k and numMapClusters (~ k*logN). I have set
these to 450 and 2500 respectively. The Mahout invocation results in 4
map tasks being launched.

At https://issues.apache.org/jira/browse/MAHOUT-1469, Ted mentions
that: "There is a known bottle neck in the parallel version due to the
fact that if you split the computation m ways, each split has to
produce k log N/m sketch clusters for a total of mk log N/m clusters
passed to the reducers. If k = 1000 and you have m=1000 mappers, then
processing a billion points will produce 20,000 sketch clusters from
each mapper and you will potentially have 20,000,000 centroids passed
to the reducer which is only 50x less than the original data size."

Is it not that the number specified with -km switch is updated as the
streaming step processes each subsequent input vector? The page
https://mahout.apache.org/users/clustering/streaming-k-means.html
states that clusterLogFactor is "a constant parameter such that
clusterLogFactor log(numProcessedPoints) is the runtime estimate of
the number of clusters to be produced by the streaming step", but I'm
not sure if this parameter is exposed via a switch.


Any insight is greatly appreciated!


Thanks,
Harsh

Reply via email to