It is NEVER a good idea to hold items in memory - after all this is big data and you want it to scale - I do not see what stops you from reading one record, processing it and writing it out without retaining it. It is OK to keep statistics while iterating through a key and output them at the end but holding all values for a key is almost never a good idea unless you can guarantee limits to these
On Thu, Jun 28, 2012 at 2:37 PM, Berry, Matt <mwbe...@amazon.com> wrote: > I have a MapReduce job that reads in several gigs of log files and > separates the records based on who generated them. My MapReduce job looks > like this: > > InputFormat: NLineInputFormat > - Reads N lines of an input file, which is an array of URLs for log files > to download > Mapper: <LongWritable, Text, WhoKey, LogRecord> > - (LongWritable,Text) is coming from the input format. The Text is parsed > into an array of URLs. Each log is downloaded and the records extracted > - WhoKey is just an multipart Key that describes who caused a record to be > logged > - LogRecord is the record that they logged, with all irrelevant > information purged > Reducer: <WhoKey, LogRecord, WhoKey, AggregateRecords> > - The Reducer iterates through the LogRecords for the WhoKey and adds them > to a LinkedList (AggregateRecords) and emits that to the output format > OutputFormat: <WhoKey, AggregateRecords> > - Creates a file for each WhoKey and writes the records into it > > However I'm running into a problem. When a single person generates an > inordinate number of records, all of them have to be held in memory causing > my heap space to run out. I could increase the heap size, but that will not > solve the problem as they could just generate more records and break it > again. I've spent a lot of time thinking about how I could alter my setup > so no more than N number of records are held in memory at a time, but I > can't think of a way to do it. > > Is there something seriously wrong with how I am processing this? Should I > have structured the job in a different way that would avoid this scenario? > Isn't the MapReduce framework designed to operate on large data sets, > shouldn't it be managing the heap better? > > Stderr and Stack Trace: > > 12/06/28 12:10:55 INFO mapred.JobClient: map 100% reduce 67% > 12/06/28 12:10:58 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:10:58 INFO mapred.JobClient: map 100% reduce 69% > 12/06/28 12:11:01 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:01 INFO mapred.JobClient: map 100% reduce 71% > 12/06/28 12:11:04 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:06 INFO mapred.JobClient: map 100% reduce 72% > 12/06/28 12:11:07 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:11 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:15 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:31 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:35 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:41 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:46 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:51 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:56 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:01 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:06 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:12 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:22 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:44 WARN mapred.LocalJobRunner: job_local_0001 > java.lang.OutOfMemoryError: GC overhead limit exceeded > at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown > Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > at java.lang.reflect.Constructor.newInstance(Constructor.java:513) > at java.lang.Class.newInstance0(Class.java:355) > at java.lang.Class.newInstance(Class.java:308) > at > xxxxxx.xxxxxxxxxxx.xxxx.xxxxxxxx.xxxxxxxxxxxxxxxxxx.readFields(xxxxxxxxxxxxxxxxxx.java:41) > at > org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67) > at > org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40) > at > org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:116) > at > org.apache.hadoop.mapreduce.ReduceContext$ValueIterator.next(ReduceContext.java:163) > at > xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:34) > at > xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:26) > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) > at > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) > at > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216) > > P.S. Already used jmap to dump the heap and trim each object down to its > bare minimum and to also confirm there are no slow memory leaks. > -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com