I have a MapReduce job that reads in several gigs of log files and separates 
the records based on who generated them. My MapReduce job looks like this:

InputFormat: NLineInputFormat 
- Reads N lines of an input file, which is an array of URLs for log files to 
download
Mapper: <LongWritable, Text, WhoKey, LogRecord>
- (LongWritable,Text) is coming from the input format. The Text is parsed into 
an array of URLs. Each log is downloaded and the records extracted
- WhoKey is just an multipart Key that describes who caused a record to be 
logged
- LogRecord is the record that they logged, with all irrelevant information 
purged
Reducer: <WhoKey, LogRecord, WhoKey, AggregateRecords>
- The Reducer iterates through the LogRecords for the WhoKey and adds them to a 
LinkedList (AggregateRecords) and emits that to the output format
OutputFormat: <WhoKey, AggregateRecords>
- Creates a file for each WhoKey and writes the records into it

However I'm running into a problem. When a single person generates an 
inordinate number of records, all of them have to be held in memory causing my 
heap space to run out. I could increase the heap size, but that will not solve 
the problem as they could just generate more records and break it again. I've 
spent a lot of time thinking about how I could alter my setup so no more than N 
number of records are held in memory at a time, but I can't think of a way to 
do it.

Is there something seriously wrong with how I am processing this? Should I have 
structured the job in a different way that would avoid this scenario? Isn't the 
MapReduce framework designed to operate on large data sets, shouldn't it be 
managing the heap better?

Stderr and Stack Trace:

12/06/28 12:10:55 INFO mapred.JobClient:  map 100% reduce 67%
12/06/28 12:10:58 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:10:58 INFO mapred.JobClient:  map 100% reduce 69%
12/06/28 12:11:01 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:11:01 INFO mapred.JobClient:  map 100% reduce 71%
12/06/28 12:11:04 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:11:06 INFO mapred.JobClient:  map 100% reduce 72%
12/06/28 12:11:07 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:11:11 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:11:15 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:15:31 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:15:35 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:15:41 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:15:46 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:15:51 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:15:56 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:16:01 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:16:06 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:16:12 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:16:22 INFO mapred.LocalJobRunner: reduce > reduce
12/06/28 12:16:44 WARN mapred.LocalJobRunner: job_local_0001
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown 
Source)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
        at java.lang.Class.newInstance0(Class.java:355)
        at java.lang.Class.newInstance(Class.java:308)
        at 
xxxxxx.xxxxxxxxxxx.xxxx.xxxxxxxx.xxxxxxxxxxxxxxxxxx.readFields(xxxxxxxxxxxxxxxxxx.java:41)
        at 
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
        at 
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
        at 
org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:116)
        at 
org.apache.hadoop.mapreduce.ReduceContext$ValueIterator.next(ReduceContext.java:163)
        at 
xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:34)
        at 
xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:26)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
        at 
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
        at 
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216)

P.S. Already used jmap to dump the heap and trim each object down to its bare 
minimum and to also confirm there are no slow memory leaks.

Reply via email to