Hello,
I'm trying to run streaming k-means in the hadoop/non-local mode with reduce_streaming_k_means option enabled. The map stage completes successfully but the reduce stage fails to do so and times out. I'm basically trying to understand why. I have quite a few questions, but first here's a part of the progress update >> 15/10/07 15:37:59 INFO mapreduce.Job: map 62% reduce 0% 15/10/07 15:38:47 INFO mapreduce.Job: map 63% reduce 0% 15/10/07 15:39:20 INFO mapreduce.Job: map 64% reduce 0% 15/10/07 15:39:41 INFO mapreduce.Job: map 65% reduce 0% 15/10/07 15:40:05 INFO mapreduce.Job: map 66% reduce 0% 15/10/07 15:40:07 INFO mapreduce.Job: map 74% reduce 0% 15/10/07 15:40:10 INFO mapreduce.Job: map 82% reduce 0% 15/10/07 15:40:17 INFO mapreduce.Job: map 83% reduce 0% 15/10/07 15:40:19 INFO mapreduce.Job: map 83% reduce 17% 15/10/07 15:40:37 INFO mapreduce.Job: map 91% reduce 17% 15/10/07 15:40:40 INFO mapreduce.Job: map 91% reduce 25% 15/10/07 15:40:57 INFO mapreduce.Job: map 92% reduce 25% 15/10/07 15:41:15 INFO mapreduce.Job: map 100% reduce 25% 15/10/07 15:41:16 INFO mapreduce.Job: map 100% reduce 67% 15/10/07 15:41:28 INFO mapreduce.Job: map 100% reduce 68% 15/10/07 15:42:39 INFO mapreduce.Job: map 100% reduce 69% 15/10/07 15:45:29 INFO mapreduce.Job: map 100% reduce 70% 15/10/07 15:50:32 INFO mapreduce.Job: map 100% reduce 71% 15/10/07 15:57:53 INFO mapreduce.Job: map 100% reduce 72% 15/10/07 16:08:39 INFO mapreduce.Job: map 100% reduce 73% 15/10/07 16:23:16 INFO mapreduce.Job: map 100% reduce 74% 15/10/07 16:40:17 INFO mapreduce.Job: map 100% reduce 75% 15/10/07 16:59:36 INFO mapreduce.Job: map 100% reduce 76% 15/10/07 17:22:25 INFO mapreduce.Job: map 100% reduce 77% 15/10/07 17:51:01 INFO mapreduce.Job: Task Id : attempt_1443139520415_1449_r_000000_0, Status : FAILED AttemptID:attempt_1443139520415_1449_r_000000_0 Timed out after 600 secs 15/10/07 17:51:03 INFO mapreduce.Job: map 100% reduce 0% 15/10/07 17:51:18 INFO mapreduce.Job: map 100% reduce 67% 15/10/07 17:51:32 INFO mapreduce.Job: map 100% reduce 68% 15/10/07 17:52:45 INFO mapreduce.Job: map 100% reduce 69% 15/10/07 17:55:47 INFO mapreduce.Job: map 100% reduce 70% 15/10/07 18:01:03 INFO mapreduce.Job: map 100% reduce 71% 15/10/07 18:08:43 INFO mapreduce.Job: map 100% reduce 72% It appears that the reduce task container fails to report on progress status within the configured timeout interval. Also, before the reduce job times out, each progress update took longer than the previous one. My understanding is that without the -rskm switch, the map stage is a streaming k-means step and the reduce stage is the ball k-means step. With the switch however, I understand that the reduce stage consists of a streaming k-means step as well, wherein the somewhat large number of centroids from the map stage are clustered again to get a smaller set of centroids for the ball k-means step. Is this so? My command line invocation has these options specified: -k 450 -km 2500 -e 0.8 -mi 10 -tf 0.8 -testp 0.2 -nbkm 5 -dm org.apache.mahout.common.distance.TanimotoDistanceMeasure -sc org.apache.mahout.math.neighborhood.LocalitySensitiveHashSearch -s 6 -rskm My input tf-vectors are sparse vectors (binary-valued with at most 6 non-zero dimensions; centroids of course, over the course of iterations, don't remain as sparse as inputs). There are just under 200,000 input vectors. Would be great if you could help me understand what's causing the job to timeout? Based on the invocation switches above, do we know how each of the following aspects impacts the progress _and_ progress reporting of the reduce stage >> a) choice of searcherClass and searchSize. I'm using LSH Search with a size of 6. b) choice of testProbability and number of ball k-means runs. I'm using 0.2 and 5 respectively. Does specifying more than one run influence c) choice of maximum number of iterations to run for ball k-means. I have set it to 10. d) choice of initial estimated distance cutoff. I have set it to 0.8. I believe this is a parameter for streaming step and not ball step? e) choice of trim fraction. i have set it to 0.8. My understanding is that increasing the trim-fraction implies more vectors are used for computing a centroid. f) choice of distance measure. I know I'm using Tanimoto and that the streaming k-means papers referenced by Mahout focus on Euclidean distance. But from implementation point-of-view, are there implications for memory usage and computation if one chooses to use a particular distance metric (especially given that my vectors are sparse and my metric is non-Euclidean)? g) finally, the choice of k and numMapClusters (~ k*logN). I have set these to 450 and 2500 respectively. The Mahout invocation results in 4 map tasks being launched. At https://issues.apache.org/jira/browse/MAHOUT-1469, Ted mentions that: "There is a known bottle neck in the parallel version due to the fact that if you split the computation m ways, each split has to produce k log N/m sketch clusters for a total of mk log N/m clusters passed to the reducers. If k = 1000 and you have m=1000 mappers, then processing a billion points will produce 20,000 sketch clusters from each mapper and you will potentially have 20,000,000 centroids passed to the reducer which is only 50x less than the original data size." Is it not that the number specified with -km switch is updated as the streaming step processes each subsequent input vector? The page https://mahout.apache.org/users/clustering/streaming-k-means.html states that clusterLogFactor is "a constant parameter such that clusterLogFactor log(numProcessedPoints) is the runtime estimate of the number of clusters to be produced by the streaming step", but I'm not sure if this parameter is exposed via a switch. Any insight is greatly appreciated! Thanks, Harsh