Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1460#issuecomment-50072248
Hey Davies, I tried this out a bit and saw two issues / areas for
improvement:
1) Since the ExternalMerger is used in both map tasks and reduce tasks, one
problem that can happen is that the reduce task's data is already hashed modulo
the # of reduce tasks, and so you get many empty buckets. For example, if you
have 2 reduce tasks, task 0 gets all the values whose hash code is even, so it
can only use half its buckets. If you have 64 reduce tasks, only one bucket is
used.
The best way to fix this would be to hash values with a random hash
function when choosing the bucket. One simple way might be to generate a random
integer X for each ExternalMerger and then take hash((key, X)) instead of
hash(key) when choosing the bucket. This is equivalent to salting your hash
function. Maybe you have other ideas but I'd suggest trying this first.
2) I also noticed that sometimes maps would fill up again before the old
memory was fully freed, leading to smaller spills. For example, for (Int, Int)
pairs the first spill from 512 MB memory is about 68 MB of files, but later
spills were only around 20 MB. I found that I could get better performance
overall by adding some gc.collect() calls after every data.clear() and
pdata.clear(). This freed more memory faster and allowed us to do more work in
memory before spilling. The perf difference for one test job was around 30% but
you should try it on your own jobs.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---