Thanks. You are correct that this strategy does not achieve a total sort, only a partial/local sort, since that's all the application requires. I think the technique is sometimes referred to as "secondary sort", and KeyFieldBasedPartitioner is sometimes used as a convenience to implement it, but our implementation just uses HashPartitioner with a specially designed hash function.
Thanks for your advice re. the options. I'll investigate further with tweaking them. If I end up filing a Hadoop bug, I'll try and remember to follow up here. On Thu, Feb 21, 2013 at 4:43 AM, Hemanth Yamijala <[email protected]> wrote: > Hi, > > I might be going slightly tangential here. Since you mention sorting - is > this sorting the total input ? In that case, does HashPartitioner even work > ? Because the partitions would only be locally sorted - but globally > unsorted. > > There is a sort example in Hadoop: > http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/examples/org/apache/hadoop/examples/Sort.java?view=markup > which you can look at to see how it works. Basically there is a different > type of partitioner called TotalOrderPartitioner that it uses to achieve > global sorting of the input. > > Regarding the configuration options being more tuning related than > correctness related - I can't vouch for it. However, I know cases where we > have been able to tune these options and make MR programs work on larger > inputs. > > Thanks > hemanth > > > > > On Thu, Feb 21, 2013 at 4:33 PM, Shivaram Lingamneni > <[email protected]> wrote: >> >> Thanks very much for your helpful response! >> >> I should go into some more details about this job. It's essentially a >> use of the Hadoop framework to sort a large amount of data. The mapper >> transforms a record to (sorting_key, record), where the sorting keys >> are effectively unique, and the reducer is trivial, outputting the >> record and discarding the sorting key, so the memory consumption of >> both the map and the reduce steps is intended to be O(1). >> >> However, due to the nature of the sorting, it's necessary that certain >> sets of records appear together in the sorted output. Thus the >> partitioner (HashPartitioner with a specially designed hash function) >> will sometimes be forced to send a large number of records to a >> particular reducer. This is not desirable, and it occurs only rarely, >> but it's not feasible to prevent it from happening on a deterministic >> basis. You could say that it creates a reliability engineering >> problem. >> >> My understanding of the configuration options you've linked to is that >> they're intended for performance tuning, and that even if the defaults >> are not optimal for a particular input, the shuffle should still >> succeed, albeit more slowly than it could have otherwise. In >> particular, it seems like the ShuffleRamManager class (I think >> ReduceTask.ReduceCopier.ShuffleRamManager) is intended to prevent my >> crash from occurring, by disallowing the in-memory shuffle from using >> up all the JVM heap. >> >> Is it possible that the continued existence of this OutOfMemoryError >> represents a bug in ShuffleRamManager, or in some other code that is >> intended to prevent this situation from occurring? >> >> Thanks so much for your time. >> >> On Wed, Feb 20, 2013 at 5:41 PM, Hemanth Yamijala >> <[email protected]> wrote: >> > There are a few tweaks In configuration that may help. Can you please >> > look >> > at >> > >> > http://hadoop.apache.org/docs/r1.0.4/mapred_tutorial.html#Shuffle%2FReduce+Parameters >> > >> > Also, since you have mentioned reducers are unbalanced, could you use a >> > custom partitioner to balance out the outputs. Or just increase the >> > number >> > of reducers so the load is spread out. >> > >> > Thanks >> > Hemanth >> > >> > >> > On Wednesday, February 20, 2013, Shivaram Lingamneni wrote: >> >> >> >> I'm experiencing the following crash during reduce tasks: >> >> >> >> https://gist.github.com/slingamn/04ff3ff3412af23aa50d >> >> >> >> on Hadoop 1.0.3 (specifically I'm using Amazon's EMR, AMI version >> >> 2.2.1). The crash is triggered by especially unbalanced reducer >> >> inputs, i.e., when one reducer receives too many records. (The reduce >> >> task gets retried three times, but since the data is the same every >> >> time, it crashes each time in the same place and the job fails.) >> >> >> >> From the following links: >> >> >> >> https://issues.apache.org/jira/browse/MAPREDUCE-1182 >> >> >> >> >> >> >> >> http://hadoop-common.472056.n3.nabble.com/Shuffle-In-Memory-OutOfMemoryError-td433197.html >> >> >> >> it seems as though Hadoop is supposed to prevent this from happening >> >> by intelligently managing the amount of memory that is provided to the >> >> shuffle. However, I don't know how ironclad this guarantee is. >> >> >> >> Can anyone advise me on how robust I can expect Hadoop to be to this >> >> issue, in the face of highly unbalanced reducer inputs? Thanks very >> >> much for your time. > >
