just a guess .. could it be possible that the Bag is kept in memory instead
of being spilled to disk ?
browsing the code of InternalCachedBag, I saw:

private void init(int bagCount, float percent) {
        factory = TupleFactory.getInstance();
        mContents = new ArrayList<Tuple>
<http://javasourcecode.org/html/open-source/jdk/jdk-6u23/java/util/ArrayList.java.html>();

        long max = Runtime.getRuntime().maxMemory();
        maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
        cacheLimit = Integer.MAX_VALUE;

        // set limit to 0, if memusage is 0 or really really small.
    // then all tuples are put into disk        if (maxMemUsage < 1) {
            cacheLimit = 0;
        }

        addDone = false;
    }

my guess is the cacheLimit was set to Integer.MAX_VALUE and it's trying to
keep all in memory when it is not big enough but not so small to have
cacheLimit reset to 0




On Tue, Nov 22, 2011 at 10:08 AM, Ruslan Al-fakikh <
[email protected]> wrote:

> Jonathan,
>
> I am running it on Prod cluster in MR mode, not locally. I started to see
> the issue when input size grew. A few days ago I found a workaround of
> putting this property:
> mapred.child.java.opts=-Xmx1024m
> But I think this is a temporary solution and the job will fail when the
> input size will grow again.
>
> Dmitriy,
>
> Thanks a lot for the investigation. I'll try it.
>
> -----Original Message-----
> From: Dmitriy Ryaboy [mailto:[email protected]]
> Sent: 22 ноября 2011 г. 2:21
> To: [email protected]
> Subject: Re: java.lang.OutOfMemoryError when using TOP udf
>
> Ok so this:
>
> thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory {
>                                count = COUNT(thirdLevelsSummed);
>                                result = TOP( (int)(count * (double)
> ($THIRD_LEVELS_PERCENTAGE +
> $BOTS_PERCENTAGE) ), 3, thirdLevelsSummed);
>                                GENERATE FLATTEN(result);
> }
>
> requires "count" to be calculated before TOP can be applied. Since count
> can't be calculated until the reduce side, naturally, TOP can't start
> working on the map side (as it doesn't know its arguments yet).
>
> Try generating the counts * ($TLP + $BP) separately, joining them in (I am
> guessing you have no more than a few K categories -- in that case, you can
> do a replicated join), and then do group and TOP on.
>
> On Mon, Nov 21, 2011 at 1:53 PM, Jonathan Coveney <[email protected]>
> wrote:
> > You're right pablomar...hmm
> >
> > Ruslan: are you running this in mr mode on a cluster, or locally?
> >
> > I'm noticing this:
> > [2011-11-16 12:34:55] INFO  (SpillableMemoryManager.java:154) - first
> > memory handler call- Usage threshold init = 175308800(171200K) used =
> > 373454552(364701K) committed = 524288000(512000K) max =
> > 524288000(512000K)
> >
> > It looks like your max memory is 512MB. I've had issues with bag
> > spilling with less than 1GB allocated (-Xmx1024mb).
> >
> > 2011/11/21 pablomar <[email protected]>
> >
> >> i might be wrong, but it seems the error comes from
> >> while(itr.hasNext())
> >> not from the add to the queue
> >> so i don't think it is related to the number of elements in the queue
> >> ... maybe the field lenght?
> >>
> >> On 11/21/11, Jonathan Coveney <[email protected]> wrote:
> >> > Internally, TOP is using a priority queue. It tries to be smart
> >> > about pulling off excess elements, but if you ask it for enough
> >> > elements, it
> >> can
> >> > blow up, because the priority queue is going to have n elements,
> >> > where n
> >> is
> >> > the ranking you want. This is consistent with the stack trace,
> >> > which died on updateTop which is when elements are added to the
> priority queue.
> >> >
> >> > Ruslan, how large are the limits you're setting? ie (int)(count *
> >> (double)
> >> > ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) )
> >> >
> >> > As far as TOP's implementation, I imagine you could get around the
> >> > issue
> >> by
> >> > using a sorted data bag, but that might be much slower. hmm.
> >> >
> >> > 2011/11/21 Ruslan Al-fakikh <[email protected]>
> >> >
> >> >> Ok. Here it is:
> >> >> https://gist.github.com/1383266
> >> >>
> >> >> -----Original Message-----
> >> >> From: Dmitriy Ryaboy [mailto:[email protected]]
> >> >> Sent: 21 ноября 2011 г. 20:32
> >> >> To: [email protected]
> >> >> Subject: Re: java.lang.OutOfMemoryError when using TOP udf
> >> >>
> >> >> Ruslan, I think the mailing list is set to reject attachments --
> >> >> can you post it as a github gist or something similar, and send a
> link?
> >> >>
> >> >> D
> >> >>
> >> >> On Mon, Nov 21, 2011 at 6:11 AM, Ruslan Al-Fakikh
> >> >> <[email protected]> wrote:
> >> >> > Hey Dmitriy,
> >> >> >
> >> >> > I attached the script. It is not a plain-pig script, because I
> >> >> > make some preprocessing before submitting it to cluster, but the
> >> >> > general idea of what I submit is clear.
> >> >> >
> >> >> > Thanks in advance!
> >> >> >
> >> >> > On Fri, Nov 18, 2011 at 12:07 AM, Dmitriy Ryaboy
> >> >> > <[email protected]>
> >> >> wrote:
> >> >> >> Ok, so it's something in the rest of the script that's causing
> >> >> >> this to happen. Ruslan, if you send your script, I can probably
> >> >> >> figure out why (usually, it's using another, non-agebraic udf
> >> >> >> in your foreach, or for pig 0.8, generating a constant in the
> foreach).
> >> >> >>
> >> >> >> D
> >> >> >>
> >> >> >> On Thu, Nov 17, 2011 at 9:59 AM, pablomar
> >> >> >> <[email protected]> wrote:
> >> >> >>> according to the stack trace, the algebraic is not being used
> >> >> >>> it says
> >> >> >>> updateTop(Top.java:139)
> >> >> >>> exec(Top.java:116)
> >> >> >>>
> >> >> >>> On 11/17/11, Dmitriy Ryaboy <[email protected]> wrote:
> >> >> >>>> The top udf does not try to process all data in memory if the
> >> >> >>>> algebraic optimization can be applied. It does need to keep
> >> >> >>>> the topn numbers in memory of course. Can you confirm
> >> >> >>>> algebraic mode is
> >> >> used?
> >> >> >>>>
> >> >> >>>> On Nov 17, 2011, at 6:13 AM, "Ruslan Al-fakikh"
> >> >> >>>> <[email protected]>
> >> >> >>>> wrote:
> >> >> >>>>
> >> >> >>>>> Hey guys,
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>> I encounter java.lang.OutOfMemoryError when using TOP udf.
> >> >> >>>>> It seems that the udf tries to process all data in memory.
> >> >> >>>>>
> >> >> >>>>> Is there a workaround for TOP? Or maybe there is some other
> >> >> >>>>> way of getting top results? I cannot use LIMIT since I need
> >> >> >>>>> to 5% of data, not a constant number of rows.
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>> I am using:
> >> >> >>>>>
> >> >> >>>>> Apache Pig version 0.8.1-cdh3u2 (rexported)
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>> The stack trace is:
> >> >> >>>>>
> >> >> >>>>> [2011-11-16 12:34:55] INFO  (CodecPool.java:128) - Got
> >> >> >>>>> brand-new decompressor
> >> >> >>>>>
> >> >> >>>>> [2011-11-16 12:34:55] INFO  (Merger.java:473) - Down to the
> >> >> >>>>> last merge-pass, with 21 segments left of total size:
> >> >> >>>>> 2057257173 bytes
> >> >> >>>>>
> >> >> >>>>> [2011-11-16 12:34:55] INFO
> >> >> >>>>> (SpillableMemoryManager.java:154) - first memory handler
> >> >> >>>>> call- Usage threshold init =
> >> >> >>>>> 175308800(171200K) used =
> >> >> >>>>> 373454552(364701K) committed = 524288000(512000K) max =
> >> >> >>>>> 524288000(512000K)
> >> >> >>>>>
> >> >> >>>>> [2011-11-16 12:36:22] INFO
> >> >> >>>>> (SpillableMemoryManager.java:167) - first memory handler
> >> >> >>>>> call - Collection threshold init =
> >> >> >>>>> 175308800(171200K) used =
> >> >> >>>>> 496500704(484863K) committed = 524288000(512000K) max =
> >> >> >>>>> 524288000(512000K)
> >> >> >>>>>
> >> >> >>>>> [2011-11-16 12:37:28] INFO  (TaskLogsTruncater.java:69) -
> >> >> >>>>> Initializing logs'
> >> >> >>>>> truncater with mapRetainSize=-1 and reduceRetainSize=-1
> >> >> >>>>>
> >> >> >>>>> [2011-11-16 12:37:28] FATAL (Child.java:318) - Error running
> >> child :
> >> >> >>>>> java.lang.OutOfMemoryError: Java heap space
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> java.util.Arrays.copyOfRange(Arrays.java:3209)
> >> >> >>>>>
> >> >> >>>>>                at java.lang.String.<init>(String.java:215)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:644)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:547)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.data.BinInterSedes.readCharArray(BinInterSede
> >> >> >>>>> s.java
> >> >> >>>>> :210)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterSedes.ja
> >> >> >>>>> va:333
> >> >> >>>>> )
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterSedes.ja
> >> >> >>>>> va:251
> >> >> >>>>> )
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.data.BinInterSedes.addColsToTuple(BinInterSed
> >> >> >>>>> es.jav
> >> >> >>>>> a:555)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.data.BinSedesTuple.readFields(BinSedesTuple.j
> >> >> >>>>> ava:64
> >> >> >>>>> )
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.data.InternalCachedBag$CachedBagIterator.hasN
> >> >> >>>>> ext(In
> >> >> >>>>> ternalCach
> >> >> >>>>> edBag.java:237)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.builtin.TOP.updateTop(TOP.java:139)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.builtin.TOP.exec(TOP.java:116)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.builtin.TOP.exec(TOP.java:65)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> >> >>>>> expres
> >> >> >>>>> sionOperat
> >> >> >>>>> ors.POUserFunc.getNext(POUserFunc.java:245)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> >> >>>>> expres
> >> >> >>>>> sionOperat
> >> >> >>>>> ors.POUserFunc.getNext(POUserFunc.java:287)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> >> >>>>> relati
> >> >> >>>>> onalOperat
> >> >> >>>>> ors.POForEach.processPlan(POForEach.java:338)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> >> >>>>> relati
> >> >> >>>>> onalOperat
> >> >> >>>>> ors.POForEach.getNext(POForEach.java:290)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> >> >>>>> Physic
> >> >> >>>>> alOperator
> >> >> >>>>> .processInput(PhysicalOperator.java:276)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> >> >>>>> relati
> >> >> >>>>> onalOperat
> >> >> >>>>> ors.POForEach.getNext(POForEach.java:240)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer
> >> >> >>>>> .PigMa
> >> >> >>>>> pReduce$Re
> >> >> >>>>> duce.runPipeline(PigMapReduce.java:434)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer
> >> >> >>>>> .PigMa
> >> >> >>>>> pReduce$Re
> >> >> >>>>> duce.processOnePackageOutput(PigMapReduce.java:402)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer
> >> >> >>>>> .PigMa
> >> >> >>>>> pReduce$Re
> >> >> >>>>> duce.reduce(PigMapReduce.java:382)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer
> >> >> >>>>> .PigMa
> >> >> >>>>> pReduce$Re
> >> >> >>>>> duce.reduce(PigMapReduce.java:251)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>>
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:
> >> >> >>>>> 572)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.hadoop.mapred.Child$4.run(Child.java:270)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> java.security.AccessController.doPrivileged(Native
> >> >> >>>>> Method)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> javax.security.auth.Subject.doAs(Subject.java:396)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
> >> >> >>>>> upInfo
> >> >> >>>>> rmation.ja
> >> >> >>>>> va:1127)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> org.apache.hadoop.mapred.Child.main(Child.java:264)
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>> stderr logs
> >> >> >>>>>
> >> >> >>>>> Exception in thread "Low Memory Detector"
> >> >> >>>>> java.lang.OutOfMemoryError: Java heap space
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> sun.management.MemoryNotifInfoCompositeData.getCompositeData
> >> >> >>>>> (Memor
> >> >> >>>>> yNotifInfo
> >> >> >>>>> CompositeData.java:42)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> sun.management.MemoryNotifInfoCompositeData.toCompositeData(
> >> >> >>>>> Memory
> >> >> >>>>> NotifInfoC
> >> >> >>>>> ompositeData.java:36)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> sun.management.MemoryImpl.createNotification(MemoryImpl.java
> >> >> >>>>> :168)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>>
> >> >>
> >> >>
> >>
> sun.management.MemoryPoolImpl$CollectionSensor.triggerAction(MemoryPoolImpl.
> >> >> >>>>> java:300)
> >> >> >>>>>
> >> >> >>>>>                at
> >> >> >>>>> sun.management.Sensor.trigger(Sensor.java:120)
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>> Thanks in advance!
> >> >> >>>>>
> >> >> >>>>
> >> >> >>>
> >> >> >>
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > Best Regards,
> >> >> > Ruslan Al-Fakikh
> >> >> >
> >> >>
> >> >>
> >> >
> >>
> >
>
>

Reply via email to