Internally, TOP is using a priority queue. It tries to be smart about
pulling off excess elements, but if you ask it for enough elements, it can
blow up, because the priority queue is going to have n elements, where n is
the ranking you want. This is consistent with the stack trace, which died
on updateTop which is when elements are added to the priority queue.

Ruslan, how large are the limits you're setting? ie (int)(count * (double)
($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) )

As far as TOP's implementation, I imagine you could get around the issue by
using a sorted data bag, but that might be much slower. hmm.

2011/11/21 Ruslan Al-fakikh <[email protected]>

> Ok. Here it is:
> https://gist.github.com/1383266
>
> -----Original Message-----
> From: Dmitriy Ryaboy [mailto:[email protected]]
> Sent: 21 ноября 2011 г. 20:32
> To: [email protected]
> Subject: Re: java.lang.OutOfMemoryError when using TOP udf
>
> Ruslan, I think the mailing list is set to reject attachments -- can you
> post it as a github gist or something similar, and send a link?
>
> D
>
> On Mon, Nov 21, 2011 at 6:11 AM, Ruslan Al-Fakikh
> <[email protected]> wrote:
> > Hey Dmitriy,
> >
> > I attached the script. It is not a plain-pig script, because I make
> > some preprocessing before submitting it to cluster, but the general
> > idea of what I submit is clear.
> >
> > Thanks in advance!
> >
> > On Fri, Nov 18, 2011 at 12:07 AM, Dmitriy Ryaboy <[email protected]>
> wrote:
> >> Ok, so it's something in the rest of the script that's causing this
> >> to happen. Ruslan, if you send your script, I can probably figure out
> >> why (usually, it's using another, non-agebraic udf in your foreach,
> >> or for pig 0.8, generating a constant in the foreach).
> >>
> >> D
> >>
> >> On Thu, Nov 17, 2011 at 9:59 AM, pablomar
> >> <[email protected]> wrote:
> >>> according to the stack trace, the algebraic is not being used it
> >>> says
> >>> updateTop(Top.java:139)
> >>> exec(Top.java:116)
> >>>
> >>> On 11/17/11, Dmitriy Ryaboy <[email protected]> wrote:
> >>>> The top udf does not try to process all data in memory if the
> >>>> algebraic optimization can be applied. It does need to keep the
> >>>> topn numbers in memory of course. Can you confirm algebraic mode is
> used?
> >>>>
> >>>> On Nov 17, 2011, at 6:13 AM, "Ruslan Al-fakikh"
> >>>> <[email protected]>
> >>>> wrote:
> >>>>
> >>>>> Hey guys,
> >>>>>
> >>>>>
> >>>>>
> >>>>> I encounter java.lang.OutOfMemoryError when using TOP udf. It
> >>>>> seems that the udf tries to process all data in memory.
> >>>>>
> >>>>> Is there a workaround for TOP? Or maybe there is some other way of
> >>>>> getting top results? I cannot use LIMIT since I need to 5% of
> >>>>> data, not a constant number of rows.
> >>>>>
> >>>>>
> >>>>>
> >>>>> I am using:
> >>>>>
> >>>>> Apache Pig version 0.8.1-cdh3u2 (rexported)
> >>>>>
> >>>>>
> >>>>>
> >>>>> The stack trace is:
> >>>>>
> >>>>> [2011-11-16 12:34:55] INFO  (CodecPool.java:128) - Got brand-new
> >>>>> decompressor
> >>>>>
> >>>>> [2011-11-16 12:34:55] INFO  (Merger.java:473) - Down to the last
> >>>>> merge-pass, with 21 segments left of total size: 2057257173 bytes
> >>>>>
> >>>>> [2011-11-16 12:34:55] INFO  (SpillableMemoryManager.java:154) -
> >>>>> first memory handler call- Usage threshold init =
> >>>>> 175308800(171200K) used =
> >>>>> 373454552(364701K) committed = 524288000(512000K) max =
> >>>>> 524288000(512000K)
> >>>>>
> >>>>> [2011-11-16 12:36:22] INFO  (SpillableMemoryManager.java:167) -
> >>>>> first memory handler call - Collection threshold init =
> >>>>> 175308800(171200K) used =
> >>>>> 496500704(484863K) committed = 524288000(512000K) max =
> >>>>> 524288000(512000K)
> >>>>>
> >>>>> [2011-11-16 12:37:28] INFO  (TaskLogsTruncater.java:69) -
> >>>>> Initializing logs'
> >>>>> truncater with mapRetainSize=-1 and reduceRetainSize=-1
> >>>>>
> >>>>> [2011-11-16 12:37:28] FATAL (Child.java:318) - Error running child :
> >>>>> java.lang.OutOfMemoryError: Java heap space
> >>>>>
> >>>>>                at java.util.Arrays.copyOfRange(Arrays.java:3209)
> >>>>>
> >>>>>                at java.lang.String.<init>(String.java:215)
> >>>>>
> >>>>>                at
> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:644)
> >>>>>
> >>>>>                at
> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:547)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.data.BinInterSedes.readCharArray(BinInterSedes.java
> >>>>> :210)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterSedes.java:333
> >>>>> )
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterSedes.java:251
> >>>>> )
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.data.BinInterSedes.addColsToTuple(BinInterSedes.jav
> >>>>> a:555)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.data.BinSedesTuple.readFields(BinSedesTuple.java:64
> >>>>> )
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.data.InternalCachedBag$CachedBagIterator.hasNext(In
> >>>>> ternalCach
> >>>>> edBag.java:237)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.builtin.TOP.updateTop(TOP.java:139)
> >>>>>
> >>>>>                at org.apache.pig.builtin.TOP.exec(TOP.java:116)
> >>>>>
> >>>>>                at org.apache.pig.builtin.TOP.exec(TOP.java:65)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expres
> >>>>> sionOperat
> >>>>> ors.POUserFunc.getNext(POUserFunc.java:245)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expres
> >>>>> sionOperat
> >>>>> ors.POUserFunc.getNext(POUserFunc.java:287)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relati
> >>>>> onalOperat
> >>>>> ors.POForEach.processPlan(POForEach.java:338)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relati
> >>>>> onalOperat
> >>>>> ors.POForEach.getNext(POForEach.java:290)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.Physic
> >>>>> alOperator
> >>>>> .processInput(PhysicalOperator.java:276)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relati
> >>>>> onalOperat
> >>>>> ors.POForEach.getNext(POForEach.java:240)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMa
> >>>>> pReduce$Re
> >>>>> duce.runPipeline(PigMapReduce.java:434)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMa
> >>>>> pReduce$Re
> >>>>> duce.processOnePackageOutput(PigMapReduce.java:402)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMa
> >>>>> pReduce$Re
> >>>>> duce.reduce(PigMapReduce.java:382)
> >>>>>
> >>>>>                at
> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMa
> >>>>> pReduce$Re
> >>>>> duce.reduce(PigMapReduce.java:251)
> >>>>>
> >>>>>                at
> >>>>> org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
> >>>>>
> >>>>>                at
> >>>>> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:
> >>>>> 572)
> >>>>>
> >>>>>                at
> >>>>> org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
> >>>>>
> >>>>>                at
> >>>>> org.apache.hadoop.mapred.Child$4.run(Child.java:270)
> >>>>>
> >>>>>                at
> >>>>> java.security.AccessController.doPrivileged(Native
> >>>>> Method)
> >>>>>
> >>>>>                at
> >>>>> javax.security.auth.Subject.doAs(Subject.java:396)
> >>>>>
> >>>>>                at
> >>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInfo
> >>>>> rmation.ja
> >>>>> va:1127)
> >>>>>
> >>>>>                at
> >>>>> org.apache.hadoop.mapred.Child.main(Child.java:264)
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> stderr logs
> >>>>>
> >>>>> Exception in thread "Low Memory Detector"
> >>>>> java.lang.OutOfMemoryError: Java heap space
> >>>>>
> >>>>>                at
> >>>>> sun.management.MemoryNotifInfoCompositeData.getCompositeData(Memor
> >>>>> yNotifInfo
> >>>>> CompositeData.java:42)
> >>>>>
> >>>>>                at
> >>>>> sun.management.MemoryNotifInfoCompositeData.toCompositeData(Memory
> >>>>> NotifInfoC
> >>>>> ompositeData.java:36)
> >>>>>
> >>>>>                at
> >>>>> sun.management.MemoryImpl.createNotification(MemoryImpl.java:168)
> >>>>>
> >>>>>                at
> >>>>>
>
> sun.management.MemoryPoolImpl$CollectionSensor.triggerAction(MemoryPoolImpl.
> >>>>> java:300)
> >>>>>
> >>>>>                at sun.management.Sensor.trigger(Sensor.java:120)
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> Thanks in advance!
> >>>>>
> >>>>
> >>>
> >>
> >
> >
> >
> > --
> > Best Regards,
> > Ruslan Al-Fakikh
> >
>
>

Reply via email to