If you look at the MeanShiftCanopyMapper, you will notice that the mapper 
writes all merged canopy values to a single "canopy" key. Running this with 
multiple reducers will give the same performance as running with a single 
reducer as only one reducer will get the values associated with that key. That 
is why the driver was setting the numReduceTasks(1).

I've thought a bit about how this might be improved. It is clearly a form of 
bottom-up hierarchical clustering algorithm: 
- The input vector set is first converted into an equivalent set of 1-vector 
clusters by a map-only process that preserves the number of input files. 
- Then the cluster set is partitioned into M mappers by hadoop. Each mapper 
combines clusters which "touch" and shifts all clusters toward their local 
means using T1 and T2 parameters like in Canopy. Presumably, each mapper's 
output set will be smaller than it's input set due to the combination of 
touching clusters.
- Here's where the scalability breaks down as a single reducer is used to merge 
the mapper output sets into a single file, after which a single mapper and 
reducer will complete the iterations. 
- How can we utilize more reducers and thus keep the concurrency levels up? 
-- We could imagine just using map-only processing, but this would keep the 
original input sets distinct. Each mapper set would converge internally but 
points close to each other in different mapper sets would never see each other 
and thus could never merge. Not workable; we need the input sets to be mixed in 
each iteration to avoid this.
-- If we knew we were going to have M mappers for an iteration, and we wanted 
to use R reducers, then one approach might be to randomly assign each mapper 
output cluster to one of the R reducers. This would mix up the mapper streams 
in each iteration and allow adjacent points to be combined across the mapper 
streams. If we simultaneously decreased the R value for each iteration (as the 
clusters merged) then finally there would be a single output file that had 
considered all of the input points.
- Of course, there is another scalability limitation in the current 
implementation: each cluster contains the clusterIds of all the other clusters 
which it has consumed. Thus the size of each cluster will grow monotonically 
and at some point will outgrow the heap. This could be handled by writing out 
merge records during processing and then combining them back to determine the 
point->cluster mapping. This is likely not necessary until the first limitation 
is addressed, but could be done independently.

I'm open to other ways of thinking about the scalability of mean shift. 
Comments would be appreciated.

 

-----Original Message-----
From: Sengupta, Sohini IN BLR SISL [mailto:[email protected]] 
Sent: Thursday, June 23, 2011 5:59 AM
To: [email protected]
Subject: FW: meanshift reduce task problem

Any inputs will be helpful.
Thanks

________________________________
From: Sengupta, Sohini IN BLR SISL
Sent: Wednesday, June 22, 2011 5:15 PM
To: [email protected]
Cc: Sengupta, Sohini IN BLR SISL
Subject: meanshift reduce task problem

Hi,

I have programmatically specified setNumReduceTasks(16) in 
MeanShiftCanopyDriver.java. On execution the number of reducers is being set 
correctly (i.e. 16 as visible on jobtracker screen)  but on digging deeper I 
see that one node has maximum number of bytes to process and it is nominal for 
rest of the nodes. Hence the reduce phase is very slow after 98% completion.

I am trying this on a cluster of 18 nodes. I also see that load is distributed 
evenly in map phase but not in reduce. This is happening on 0.4 and 0.5 
versions of Mahout. Has anyone faced such a problem and how to get around it?
Thanks a lot in advance,
Sohini

________________________________
Important notice: This e-mail and any attachment there to contains corporate 
proprietary information. If you have received it by mistake, please notify us 
immediately by reply e-mail and delete this e-mail and its attachments from 
your system.
Thank You.

Reply via email to