Ok so this:
thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory {
count = COUNT(thirdLevelsSummed);
result = TOP( (int)(count * (double)
($THIRD_LEVELS_PERCENTAGE +
$BOTS_PERCENTAGE) ), 3, thirdLevelsSummed);
GENERATE FLATTEN(result);
}
requires "count" to be calculated before TOP can be applied. Since
count can't be calculated until the reduce side, naturally, TOP can't
start working on the map side (as it doesn't know its arguments yet).
Try generating the counts * ($TLP + $BP) separately, joining them in
(I am guessing you have no more than a few K categories -- in that
case, you can do a replicated join), and then do group and TOP on.
On Mon, Nov 21, 2011 at 1:53 PM, Jonathan Coveney <[email protected]> wrote:
> You're right pablomar...hmm
>
> Ruslan: are you running this in mr mode on a cluster, or locally?
>
> I'm noticing this:
> [2011-11-16 12:34:55] INFO (SpillableMemoryManager.java:154) - first memory
> handler call- Usage threshold init = 175308800(171200K) used =
> 373454552(364701K) committed = 524288000(512000K) max = 524288000(512000K)
>
> It looks like your max memory is 512MB. I've had issues with bag spilling
> with less than 1GB allocated (-Xmx1024mb).
>
> 2011/11/21 pablomar <[email protected]>
>
>> i might be wrong, but it seems the error comes from
>> while(itr.hasNext())
>> not from the add to the queue
>> so i don't think it is related to the number of elements in the queue
>> ... maybe the field lenght?
>>
>> On 11/21/11, Jonathan Coveney <[email protected]> wrote:
>> > Internally, TOP is using a priority queue. It tries to be smart about
>> > pulling off excess elements, but if you ask it for enough elements, it
>> can
>> > blow up, because the priority queue is going to have n elements, where n
>> is
>> > the ranking you want. This is consistent with the stack trace, which died
>> > on updateTop which is when elements are added to the priority queue.
>> >
>> > Ruslan, how large are the limits you're setting? ie (int)(count *
>> (double)
>> > ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) )
>> >
>> > As far as TOP's implementation, I imagine you could get around the issue
>> by
>> > using a sorted data bag, but that might be much slower. hmm.
>> >
>> > 2011/11/21 Ruslan Al-fakikh <[email protected]>
>> >
>> >> Ok. Here it is:
>> >> https://gist.github.com/1383266
>> >>
>> >> -----Original Message-----
>> >> From: Dmitriy Ryaboy [mailto:[email protected]]
>> >> Sent: 21 ноября 2011 г. 20:32
>> >> To: [email protected]
>> >> Subject: Re: java.lang.OutOfMemoryError when using TOP udf
>> >>
>> >> Ruslan, I think the mailing list is set to reject attachments -- can you
>> >> post it as a github gist or something similar, and send a link?
>> >>
>> >> D
>> >>
>> >> On Mon, Nov 21, 2011 at 6:11 AM, Ruslan Al-Fakikh
>> >> <[email protected]> wrote:
>> >> > Hey Dmitriy,
>> >> >
>> >> > I attached the script. It is not a plain-pig script, because I make
>> >> > some preprocessing before submitting it to cluster, but the general
>> >> > idea of what I submit is clear.
>> >> >
>> >> > Thanks in advance!
>> >> >
>> >> > On Fri, Nov 18, 2011 at 12:07 AM, Dmitriy Ryaboy <[email protected]>
>> >> wrote:
>> >> >> Ok, so it's something in the rest of the script that's causing this
>> >> >> to happen. Ruslan, if you send your script, I can probably figure out
>> >> >> why (usually, it's using another, non-agebraic udf in your foreach,
>> >> >> or for pig 0.8, generating a constant in the foreach).
>> >> >>
>> >> >> D
>> >> >>
>> >> >> On Thu, Nov 17, 2011 at 9:59 AM, pablomar
>> >> >> <[email protected]> wrote:
>> >> >>> according to the stack trace, the algebraic is not being used it
>> >> >>> says
>> >> >>> updateTop(Top.java:139)
>> >> >>> exec(Top.java:116)
>> >> >>>
>> >> >>> On 11/17/11, Dmitriy Ryaboy <[email protected]> wrote:
>> >> >>>> The top udf does not try to process all data in memory if the
>> >> >>>> algebraic optimization can be applied. It does need to keep the
>> >> >>>> topn numbers in memory of course. Can you confirm algebraic mode is
>> >> used?
>> >> >>>>
>> >> >>>> On Nov 17, 2011, at 6:13 AM, "Ruslan Al-fakikh"
>> >> >>>> <[email protected]>
>> >> >>>> wrote:
>> >> >>>>
>> >> >>>>> Hey guys,
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> I encounter java.lang.OutOfMemoryError when using TOP udf. It
>> >> >>>>> seems that the udf tries to process all data in memory.
>> >> >>>>>
>> >> >>>>> Is there a workaround for TOP? Or maybe there is some other way of
>> >> >>>>> getting top results? I cannot use LIMIT since I need to 5% of
>> >> >>>>> data, not a constant number of rows.
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> I am using:
>> >> >>>>>
>> >> >>>>> Apache Pig version 0.8.1-cdh3u2 (rexported)
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> The stack trace is:
>> >> >>>>>
>> >> >>>>> [2011-11-16 12:34:55] INFO (CodecPool.java:128) - Got brand-new
>> >> >>>>> decompressor
>> >> >>>>>
>> >> >>>>> [2011-11-16 12:34:55] INFO (Merger.java:473) - Down to the last
>> >> >>>>> merge-pass, with 21 segments left of total size: 2057257173 bytes
>> >> >>>>>
>> >> >>>>> [2011-11-16 12:34:55] INFO (SpillableMemoryManager.java:154) -
>> >> >>>>> first memory handler call- Usage threshold init =
>> >> >>>>> 175308800(171200K) used =
>> >> >>>>> 373454552(364701K) committed = 524288000(512000K) max =
>> >> >>>>> 524288000(512000K)
>> >> >>>>>
>> >> >>>>> [2011-11-16 12:36:22] INFO (SpillableMemoryManager.java:167) -
>> >> >>>>> first memory handler call - Collection threshold init =
>> >> >>>>> 175308800(171200K) used =
>> >> >>>>> 496500704(484863K) committed = 524288000(512000K) max =
>> >> >>>>> 524288000(512000K)
>> >> >>>>>
>> >> >>>>> [2011-11-16 12:37:28] INFO (TaskLogsTruncater.java:69) -
>> >> >>>>> Initializing logs'
>> >> >>>>> truncater with mapRetainSize=-1 and reduceRetainSize=-1
>> >> >>>>>
>> >> >>>>> [2011-11-16 12:37:28] FATAL (Child.java:318) - Error running
>> child :
>> >> >>>>> java.lang.OutOfMemoryError: Java heap space
>> >> >>>>>
>> >> >>>>> at java.util.Arrays.copyOfRange(Arrays.java:3209)
>> >> >>>>>
>> >> >>>>> at java.lang.String.<init>(String.java:215)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:644)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:547)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.data.BinInterSedes.readCharArray(BinInterSedes.java
>> >> >>>>> :210)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterSedes.java:333
>> >> >>>>> )
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterSedes.java:251
>> >> >>>>> )
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.data.BinInterSedes.addColsToTuple(BinInterSedes.jav
>> >> >>>>> a:555)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.data.BinSedesTuple.readFields(BinSedesTuple.java:64
>> >> >>>>> )
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.data.InternalCachedBag$CachedBagIterator.hasNext(In
>> >> >>>>> ternalCach
>> >> >>>>> edBag.java:237)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.builtin.TOP.updateTop(TOP.java:139)
>> >> >>>>>
>> >> >>>>> at org.apache.pig.builtin.TOP.exec(TOP.java:116)
>> >> >>>>>
>> >> >>>>> at org.apache.pig.builtin.TOP.exec(TOP.java:65)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expres
>> >> >>>>> sionOperat
>> >> >>>>> ors.POUserFunc.getNext(POUserFunc.java:245)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expres
>> >> >>>>> sionOperat
>> >> >>>>> ors.POUserFunc.getNext(POUserFunc.java:287)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relati
>> >> >>>>> onalOperat
>> >> >>>>> ors.POForEach.processPlan(POForEach.java:338)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relati
>> >> >>>>> onalOperat
>> >> >>>>> ors.POForEach.getNext(POForEach.java:290)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.Physic
>> >> >>>>> alOperator
>> >> >>>>> .processInput(PhysicalOperator.java:276)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relati
>> >> >>>>> onalOperat
>> >> >>>>> ors.POForEach.getNext(POForEach.java:240)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMa
>> >> >>>>> pReduce$Re
>> >> >>>>> duce.runPipeline(PigMapReduce.java:434)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMa
>> >> >>>>> pReduce$Re
>> >> >>>>> duce.processOnePackageOutput(PigMapReduce.java:402)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMa
>> >> >>>>> pReduce$Re
>> >> >>>>> duce.reduce(PigMapReduce.java:382)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMa
>> >> >>>>> pReduce$Re
>> >> >>>>> duce.reduce(PigMapReduce.java:251)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:
>> >> >>>>> 572)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.hadoop.mapred.Child$4.run(Child.java:270)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> java.security.AccessController.doPrivileged(Native
>> >> >>>>> Method)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> javax.security.auth.Subject.doAs(Subject.java:396)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInfo
>> >> >>>>> rmation.ja
>> >> >>>>> va:1127)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> org.apache.hadoop.mapred.Child.main(Child.java:264)
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> stderr logs
>> >> >>>>>
>> >> >>>>> Exception in thread "Low Memory Detector"
>> >> >>>>> java.lang.OutOfMemoryError: Java heap space
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> sun.management.MemoryNotifInfoCompositeData.getCompositeData(Memor
>> >> >>>>> yNotifInfo
>> >> >>>>> CompositeData.java:42)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> sun.management.MemoryNotifInfoCompositeData.toCompositeData(Memory
>> >> >>>>> NotifInfoC
>> >> >>>>> ompositeData.java:36)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>> sun.management.MemoryImpl.createNotification(MemoryImpl.java:168)
>> >> >>>>>
>> >> >>>>> at
>> >> >>>>>
>> >>
>> >>
>> sun.management.MemoryPoolImpl$CollectionSensor.triggerAction(MemoryPoolImpl.
>> >> >>>>> java:300)
>> >> >>>>>
>> >> >>>>> at sun.management.Sensor.trigger(Sensor.java:120)
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> Thanks in advance!
>> >> >>>>>
>> >> >>>>
>> >> >>>
>> >> >>
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Best Regards,
>> >> > Ruslan Al-Fakikh
>> >> >
>> >>
>> >>
>> >
>>
>