Hey Matt,

As far as I can tell, Hadoop isn't at fault here truly.

If your issue is that you collect in a list before you store, you
should focus on that and just avoid collecting it completely. Why
don't you serialize as you receive, if the incoming order is already
taken care of? As far as I can tell, your AggregateRecords probably
does nothing else but serialize the stored LinkedList. So instead of
using a LinkedList, or even a composed Writable such as
AggregateRecords, just write them in as you receive them via each
.next(). Would this not work for you? You may batch a constant bit to
gain some write performance but at least you won't have to use up your
memory.

You can serialize as you receive by following this:
http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F

On Fri, Jun 29, 2012 at 3:07 AM, Berry, Matt <mwbe...@amazon.com> wrote:
> I have a MapReduce job that reads in several gigs of log files and separates 
> the records based on who generated them. My MapReduce job looks like this:
>
> InputFormat: NLineInputFormat
> - Reads N lines of an input file, which is an array of URLs for log files to 
> download
> Mapper: <LongWritable, Text, WhoKey, LogRecord>
> - (LongWritable,Text) is coming from the input format. The Text is parsed 
> into an array of URLs. Each log is downloaded and the records extracted
> - WhoKey is just an multipart Key that describes who caused a record to be 
> logged
> - LogRecord is the record that they logged, with all irrelevant information 
> purged
> Reducer: <WhoKey, LogRecord, WhoKey, AggregateRecords>
> - The Reducer iterates through the LogRecords for the WhoKey and adds them to 
> a LinkedList (AggregateRecords) and emits that to the output format
> OutputFormat: <WhoKey, AggregateRecords>
> - Creates a file for each WhoKey and writes the records into it
>
> However I'm running into a problem. When a single person generates an 
> inordinate number of records, all of them have to be held in memory causing 
> my heap space to run out. I could increase the heap size, but that will not 
> solve the problem as they could just generate more records and break it 
> again. I've spent a lot of time thinking about how I could alter my setup so 
> no more than N number of records are held in memory at a time, but I can't 
> think of a way to do it.
>
> Is there something seriously wrong with how I am processing this? Should I 
> have structured the job in a different way that would avoid this scenario? 
> Isn't the MapReduce framework designed to operate on large data sets, 
> shouldn't it be managing the heap better?
>
> Stderr and Stack Trace:
>
> 12/06/28 12:10:55 INFO mapred.JobClient:  map 100% reduce 67%
> 12/06/28 12:10:58 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:10:58 INFO mapred.JobClient:  map 100% reduce 69%
> 12/06/28 12:11:01 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:01 INFO mapred.JobClient:  map 100% reduce 71%
> 12/06/28 12:11:04 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:06 INFO mapred.JobClient:  map 100% reduce 72%
> 12/06/28 12:11:07 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:11 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:15 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:31 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:35 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:41 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:46 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:51 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:56 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:01 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:06 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:12 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:22 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:44 WARN mapred.LocalJobRunner: job_local_0001
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>        at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown 
> Source)
>        at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
>        at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
>        at java.lang.Class.newInstance0(Class.java:355)
>        at java.lang.Class.newInstance(Class.java:308)
>        at 
> xxxxxx.xxxxxxxxxxx.xxxx.xxxxxxxx.xxxxxxxxxxxxxxxxxx.readFields(xxxxxxxxxxxxxxxxxx.java:41)
>        at 
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
>        at 
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
>        at 
> org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:116)
>        at 
> org.apache.hadoop.mapreduce.ReduceContext$ValueIterator.next(ReduceContext.java:163)
>        at 
> xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:34)
>        at 
> xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:26)
>        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>        at 
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
>        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
>        at 
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216)
>
> P.S. Already used jmap to dump the heap and trim each object down to its bare 
> minimum and to also confirm there are no slow memory leaks.



-- 
Harsh J

Reply via email to