I have a MapReduce job that reads in several gigs of log files and separates the records based on who generated them. My MapReduce job looks like this:
InputFormat: NLineInputFormat - Reads N lines of an input file, which is an array of URLs for log files to download Mapper: <LongWritable, Text, WhoKey, LogRecord> - (LongWritable,Text) is coming from the input format. The Text is parsed into an array of URLs. Each log is downloaded and the records extracted - WhoKey is just an multipart Key that describes who caused a record to be logged - LogRecord is the record that they logged, with all irrelevant information purged Reducer: <WhoKey, LogRecord, WhoKey, AggregateRecords> - The Reducer iterates through the LogRecords for the WhoKey and adds them to a LinkedList (AggregateRecords) and emits that to the output format OutputFormat: <WhoKey, AggregateRecords> - Creates a file for each WhoKey and writes the records into it However I'm running into a problem. When a single person generates an inordinate number of records, all of them have to be held in memory causing my heap space to run out. I could increase the heap size, but that will not solve the problem as they could just generate more records and break it again. I've spent a lot of time thinking about how I could alter my setup so no more than N number of records are held in memory at a time, but I can't think of a way to do it. Is there something seriously wrong with how I am processing this? Should I have structured the job in a different way that would avoid this scenario? Isn't the MapReduce framework designed to operate on large data sets, shouldn't it be managing the heap better? Stderr and Stack Trace: 12/06/28 12:10:55 INFO mapred.JobClient: map 100% reduce 67% 12/06/28 12:10:58 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:10:58 INFO mapred.JobClient: map 100% reduce 69% 12/06/28 12:11:01 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:11:01 INFO mapred.JobClient: map 100% reduce 71% 12/06/28 12:11:04 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:11:06 INFO mapred.JobClient: map 100% reduce 72% 12/06/28 12:11:07 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:11:11 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:11:15 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:31 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:35 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:41 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:46 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:51 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:56 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:16:01 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:16:06 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:16:12 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:16:22 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:16:44 WARN mapred.LocalJobRunner: job_local_0001 java.lang.OutOfMemoryError: GC overhead limit exceeded at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at xxxxxx.xxxxxxxxxxx.xxxx.xxxxxxxx.xxxxxxxxxxxxxxxxxx.readFields(xxxxxxxxxxxxxxxxxx.java:41) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40) at org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:116) at org.apache.hadoop.mapreduce.ReduceContext$ValueIterator.next(ReduceContext.java:163) at xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:34) at xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:26) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216) P.S. Already used jmap to dump the heap and trim each object down to its bare minimum and to also confirm there are no slow memory leaks.