It is NEVER a good idea to hold items in memory - after all this is big
data and you want it to scale -
I do not see what stops you from reading one record, processing it and
writing it out without retaining it.
It is OK to keep statistics while iterating through a key and output them
at the end but holding all values for a key is almost never a good idea
unless you can guarantee limits to these

On Thu, Jun 28, 2012 at 2:37 PM, Berry, Matt <mwbe...@amazon.com> wrote:

> I have a MapReduce job that reads in several gigs of log files and
> separates the records based on who generated them. My MapReduce job looks
> like this:
>
> InputFormat: NLineInputFormat
> - Reads N lines of an input file, which is an array of URLs for log files
> to download
> Mapper: <LongWritable, Text, WhoKey, LogRecord>
> - (LongWritable,Text) is coming from the input format. The Text is parsed
> into an array of URLs. Each log is downloaded and the records extracted
> - WhoKey is just an multipart Key that describes who caused a record to be
> logged
> - LogRecord is the record that they logged, with all irrelevant
> information purged
> Reducer: <WhoKey, LogRecord, WhoKey, AggregateRecords>
> - The Reducer iterates through the LogRecords for the WhoKey and adds them
> to a LinkedList (AggregateRecords) and emits that to the output format
> OutputFormat: <WhoKey, AggregateRecords>
> - Creates a file for each WhoKey and writes the records into it
>
> However I'm running into a problem. When a single person generates an
> inordinate number of records, all of them have to be held in memory causing
> my heap space to run out. I could increase the heap size, but that will not
> solve the problem as they could just generate more records and break it
> again. I've spent a lot of time thinking about how I could alter my setup
> so no more than N number of records are held in memory at a time, but I
> can't think of a way to do it.
>
> Is there something seriously wrong with how I am processing this? Should I
> have structured the job in a different way that would avoid this scenario?
> Isn't the MapReduce framework designed to operate on large data sets,
> shouldn't it be managing the heap better?
>
> Stderr and Stack Trace:
>
> 12/06/28 12:10:55 INFO mapred.JobClient:  map 100% reduce 67%
> 12/06/28 12:10:58 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:10:58 INFO mapred.JobClient:  map 100% reduce 69%
> 12/06/28 12:11:01 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:01 INFO mapred.JobClient:  map 100% reduce 71%
> 12/06/28 12:11:04 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:06 INFO mapred.JobClient:  map 100% reduce 72%
> 12/06/28 12:11:07 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:11 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:15 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:31 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:35 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:41 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:46 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:51 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:56 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:01 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:06 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:12 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:22 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:44 WARN mapred.LocalJobRunner: job_local_0001
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>        at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown
> Source)
>        at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
>        at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
>        at java.lang.Class.newInstance0(Class.java:355)
>        at java.lang.Class.newInstance(Class.java:308)
>        at
> xxxxxx.xxxxxxxxxxx.xxxx.xxxxxxxx.xxxxxxxxxxxxxxxxxx.readFields(xxxxxxxxxxxxxxxxxx.java:41)
>        at
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
>        at
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
>        at
> org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:116)
>        at
> org.apache.hadoop.mapreduce.ReduceContext$ValueIterator.next(ReduceContext.java:163)
>        at
> xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:34)
>        at
> xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:26)
>        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>        at
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
>        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
>        at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216)
>
> P.S. Already used jmap to dump the heap and trim each object down to its
> bare minimum and to also confirm there are no slow memory leaks.
>



-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Reply via email to