Hi Tim, Getting 28K of map outputs to reducers should not take minutes. Reducers on a properly setup (1Gb) network should be copying at multiple MB/s. I think you need to get some more info.
Apart from top, you'll probably also want to look at iostat and vmstat. The first will tell you something about disk utilization and the latter can tell you whether the machines are using swap or not. This is very important. If you are over utilizing physical memory on the machines, thing will be slow. It's even better if you put something in place that allows you to get an overall view of the resource usage across the cluster. Look at Ganglia (http://ganglia.sourceforge.net/) or Cacti (http://www.cacti.net/) or something similar. Basically a job is either CPU bound, IO bound or network bound. You need to be able to look at all three to see what the bottleneck is. Also, you can run into churn when you saturate resources and processes are competing for them (e.g. when you have two disks and 50 processes / threads reading from them, things will be slow because the OS needs to switch between them a lot and overall throughput will be less than what the disks can do; you can see this when there is a lot of time in iowait, but overall throughput is low so there's a lot of seeks going on). On 17 nov 2010, at 09:43, Tim Robertson wrote: Hi all, We have setup a small cluster (13 nodes) using CDH3 We have been tuning it using TeraSort and Hive queries on our data, and the copy phase is very slow, so I'd like to ask if anyone can look over our config. We have an unbalanced set of machines (all on a single switch): - 10 of Intel @ 2.83GHz Quad, 8GB, 2x500G 7.2K SATA (3 mappers, 2 reducers) - 3 of Intel @ 2.53GHz Dual Quad, 24GB, 6x250GB 5.4K SATA (12 mappers, 12 reducers) We monitored the load using $top on machines, to settle on the number of mappers and reducers to stop overloading them, and the map() and reduce() is working very nicely - all our time The config: io.sort.mb=400 io.sort.factor=100 mapred.reduce.parallel.copies=20 tasktracker.http.threads=80 mapred.compress.map.output=true/false (no notible difference) mapred.map.output.compression.codec=com.hadoop.compression.lzo.LzoCodec mapred.output.compression.type=BLOCK mapred.inmem.merge.threshold=0 mapred.job.reduce.input.buffer.percent=0.7 mapred.job.reuse.jvm.num.tasks=50 An example job: (select basis_of_record,count(1) from occurrence_record group by basis_of_record) Map input records 262,573,931 finished in 2mins30 using 833 mappers Reduce was at 24% at 2mins30 finished map with all 55 running Map output records: 1,855 Map output bytes: 28,724 REDUCE COPY PHASE finished after 7mins01 secs Reduce finished after 7mins17secs I am correct that 28,724 bytes emitted from a map should not take 4mins30 right? We're running puppet so can test changes quickly. Any pointers on how we can debug / improve this are greatly appreciated! Tim