I meant the latter, an actual join statement. So, generate the counts, join them to the original relation, then group again and do TOP.
D On Fri, Dec 16, 2011 at 5:32 AM, Ruslan Al-fakikh <[email protected]> wrote: > Dmitriy, > > You wrote > >> > Ok so this: >> > >> > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory { >> > count = COUNT(thirdLevelsSummed); >> > result = TOP( (int)(count * (double) >> > ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3, thirdLevelsSummed); >> > GENERATE FLATTEN(result); } >> > >> > requires "count" to be calculated before TOP can be applied. Since count >> > can't be calculated until the reduce side, naturally, TOP >> > can't start working on the map side (as it doesn't know its arguments >> > yet). Try generating the counts * ($TLP + $BP) separately, joining them in >> > (I am guessing you have no more than a few K categories -- in that case, >> > you can do a replicated join), and then do group and TOP on. > > Probably I didn't understand your logic correctly. What I did is: > changed this: > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory { > count = COUNT(thirdLevelsSummed); > result = TOP( (int)(count * (double) > ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3, thirdLevelsSummed); > GENERATE FLATTEN(result); > } > to this: > thirdLevelsTopNumberCounted = FOREACH thirdLevelsByCategory GENERATE > group, > > thirdLevelsSummed, > (int)( > COUNT(thirdLevelsSummed) * (double) ($THIRD_LEVELS_PERCENTAGE + > $BOTS_PERCENTAGE) ) AS TopNumber; > > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsTopNumberCounted GENERATE > > FLATTEN(TOP(TopNumber, 3, thirdLevelsSummed)); > > So I removed the COUNT from the nested group. It didn't help. Probably you > meant the JOIN ... USING 'replicated' statement, but I didn't get how I can > apply it here. > > Thanks > > -----Original Message----- > From: Ruslan Al-fakikh [mailto:[email protected]] > Sent: 24 ноября 2011 г. 15:56 > To: [email protected] > Subject: RE: java.lang.OutOfMemoryError when using TOP udf > > Hm. Interesting. Yeah, I really haven't seen the error after setting > mapred.child.java.opts=-Xmx1024m. > Probably I won't have to fix the Pig script:) > > -----Original Message----- > From: Jonathan Coveney [mailto:[email protected]] > Sent: 23 ноября 2011 г. 11:46 > To: [email protected] > Subject: Re: java.lang.OutOfMemoryError when using TOP udf > > I have seen issues with spilling if it had less than 1GB of heap. Once I > allocated enough ram, no issues. It seems unlikely to me that the bag > implementation fails on this because it's such a common use and nobody has > reported an error, and running with less than 1GB of heap is definitely not > recommended. Very curious if the error crops up again. > > 2011/11/22 pablomar <[email protected]> > >> just a guess .. could it be possible that the Bag is kept in memory >> instead of being spilled to disk ? >> browsing the code of InternalCachedBag, I saw: >> >> private void init(int bagCount, float percent) { >> factory = TupleFactory.getInstance(); >> mContents = new ArrayList<Tuple> < >> http://javasourcecode.org/html/open-source/jdk/jdk-6u23/java/util/Arra >> yList.java.html >> >(); >> >> long max = Runtime.getRuntime().maxMemory(); >> maxMemUsage = (long)(((float)max * percent) / (float)bagCount); >> cacheLimit = Integer.MAX_VALUE; >> >> // set limit to 0, if memusage is 0 or really really small. >> // then all tuples are put into disk if (maxMemUsage < 1) { >> cacheLimit = 0; >> } >> >> addDone = false; >> } >> >> my guess is the cacheLimit was set to Integer.MAX_VALUE and it's >> trying to keep all in memory when it is not big enough but not so >> small to have cacheLimit reset to 0 >> >> >> >> >> On Tue, Nov 22, 2011 at 10:08 AM, Ruslan Al-fakikh < >> [email protected]> wrote: >> >> > Jonathan, >> > >> > I am running it on Prod cluster in MR mode, not locally. I started >> > to see the issue when input size grew. A few days ago I found a >> > workaround of putting this property: >> > mapred.child.java.opts=-Xmx1024m >> > But I think this is a temporary solution and the job will fail when >> > the input size will grow again. >> > >> > Dmitriy, >> > >> > Thanks a lot for the investigation. I'll try it. >> > >> > -----Original Message----- >> > From: Dmitriy Ryaboy [mailto:[email protected]] >> > Sent: 22 ноября 2011 г. 2:21 >> > To: [email protected] >> > Subject: Re: java.lang.OutOfMemoryError when using TOP udf >> > >> > Ok so this: >> > >> > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory { >> > count = COUNT(thirdLevelsSummed); >> > result = TOP( (int)(count * (double) >> > ($THIRD_LEVELS_PERCENTAGE + >> > $BOTS_PERCENTAGE) ), 3, thirdLevelsSummed); >> > GENERATE FLATTEN(result); } >> > >> > requires "count" to be calculated before TOP can be applied. Since >> > count can't be calculated until the reduce side, naturally, TOP >> > can't start working on the map side (as it doesn't know its arguments yet). >> > >> > Try generating the counts * ($TLP + $BP) separately, joining them in >> > (I >> am >> > guessing you have no more than a few K categories -- in that case, >> > you >> can >> > do a replicated join), and then do group and TOP on. >> > >> > On Mon, Nov 21, 2011 at 1:53 PM, Jonathan Coveney >> > <[email protected]> >> > wrote: >> > > You're right pablomar...hmm >> > > >> > > Ruslan: are you running this in mr mode on a cluster, or locally? >> > > >> > > I'm noticing this: >> > > [2011-11-16 12:34:55] INFO (SpillableMemoryManager.java:154) - >> > > first memory handler call- Usage threshold init = >> > > 175308800(171200K) used = >> > > 373454552(364701K) committed = 524288000(512000K) max = >> > > 524288000(512000K) >> > > >> > > It looks like your max memory is 512MB. I've had issues with bag >> > > spilling with less than 1GB allocated (-Xmx1024mb). >> > > >> > > 2011/11/21 pablomar <[email protected]> >> > > >> > >> i might be wrong, but it seems the error comes from >> > >> while(itr.hasNext()) >> > >> not from the add to the queue >> > >> so i don't think it is related to the number of elements in the >> > >> queue ... maybe the field lenght? >> > >> >> > >> On 11/21/11, Jonathan Coveney <[email protected]> wrote: >> > >> > Internally, TOP is using a priority queue. It tries to be smart >> > >> > about pulling off excess elements, but if you ask it for enough >> > >> > elements, it >> > >> can >> > >> > blow up, because the priority queue is going to have n >> > >> > elements, where n >> > >> is >> > >> > the ranking you want. This is consistent with the stack trace, >> > >> > which died on updateTop which is when elements are added to the >> > priority queue. >> > >> > >> > >> > Ruslan, how large are the limits you're setting? ie (int)(count >> > >> > * >> > >> (double) >> > >> > ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ) >> > >> > >> > >> > As far as TOP's implementation, I imagine you could get around >> > >> > the issue >> > >> by >> > >> > using a sorted data bag, but that might be much slower. hmm. >> > >> > >> > >> > 2011/11/21 Ruslan Al-fakikh <[email protected]> >> > >> > >> > >> >> Ok. Here it is: >> > >> >> https://gist.github.com/1383266 >> > >> >> >> > >> >> -----Original Message----- >> > >> >> From: Dmitriy Ryaboy [mailto:[email protected]] >> > >> >> Sent: 21 ноября 2011 г. 20:32 >> > >> >> To: [email protected] >> > >> >> Subject: Re: java.lang.OutOfMemoryError when using TOP udf >> > >> >> >> > >> >> Ruslan, I think the mailing list is set to reject attachments >> > >> >> -- can you post it as a github gist or something similar, and >> > >> >> send a >> > link? >> > >> >> >> > >> >> D >> > >> >> >> > >> >> On Mon, Nov 21, 2011 at 6:11 AM, Ruslan Al-Fakikh >> > >> >> <[email protected]> wrote: >> > >> >> > Hey Dmitriy, >> > >> >> > >> > >> >> > I attached the script. It is not a plain-pig script, because >> > >> >> > I make some preprocessing before submitting it to cluster, >> > >> >> > but the general idea of what I submit is clear. >> > >> >> > >> > >> >> > Thanks in advance! >> > >> >> > >> > >> >> > On Fri, Nov 18, 2011 at 12:07 AM, Dmitriy Ryaboy >> > >> >> > <[email protected]> >> > >> >> wrote: >> > >> >> >> Ok, so it's something in the rest of the script that's >> > >> >> >> causing this to happen. Ruslan, if you send your script, I >> > >> >> >> can probably figure out why (usually, it's using another, >> > >> >> >> non-agebraic udf in your foreach, or for pig 0.8, >> > >> >> >> generating a constant in the >> > foreach). >> > >> >> >> >> > >> >> >> D >> > >> >> >> >> > >> >> >> On Thu, Nov 17, 2011 at 9:59 AM, pablomar >> > >> >> >> <[email protected]> wrote: >> > >> >> >>> according to the stack trace, the algebraic is not being >> > >> >> >>> used it says >> > >> >> >>> updateTop(Top.java:139) >> > >> >> >>> exec(Top.java:116) >> > >> >> >>> >> > >> >> >>> On 11/17/11, Dmitriy Ryaboy <[email protected]> wrote: >> > >> >> >>>> The top udf does not try to process all data in memory if >> > >> >> >>>> the algebraic optimization can be applied. It does need >> > >> >> >>>> to keep the topn numbers in memory of course. Can you >> > >> >> >>>> confirm algebraic mode is >> > >> >> used? >> > >> >> >>>> >> > >> >> >>>> On Nov 17, 2011, at 6:13 AM, "Ruslan Al-fakikh" >> > >> >> >>>> <[email protected]> >> > >> >> >>>> wrote: >> > >> >> >>>> >> > >> >> >>>>> Hey guys, >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> I encounter java.lang.OutOfMemoryError when using TOP udf. >> > >> >> >>>>> It seems that the udf tries to process all data in memory. >> > >> >> >>>>> >> > >> >> >>>>> Is there a workaround for TOP? Or maybe there is some >> > >> >> >>>>> other way of getting top results? I cannot use LIMIT >> > >> >> >>>>> since I need to 5% of data, not a constant number of rows. >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> I am using: >> > >> >> >>>>> >> > >> >> >>>>> Apache Pig version 0.8.1-cdh3u2 (rexported) >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> The stack trace is: >> > >> >> >>>>> >> > >> >> >>>>> [2011-11-16 12:34:55] INFO (CodecPool.java:128) - Got >> > >> >> >>>>> brand-new decompressor >> > >> >> >>>>> >> > >> >> >>>>> [2011-11-16 12:34:55] INFO (Merger.java:473) - Down to >> > >> >> >>>>> the last merge-pass, with 21 segments left of total size: >> > >> >> >>>>> 2057257173 bytes >> > >> >> >>>>> >> > >> >> >>>>> [2011-11-16 12:34:55] INFO >> > >> >> >>>>> (SpillableMemoryManager.java:154) - first memory handler >> > >> >> >>>>> call- Usage threshold init = >> > >> >> >>>>> 175308800(171200K) used = >> > >> >> >>>>> 373454552(364701K) committed = 524288000(512000K) max = >> > >> >> >>>>> 524288000(512000K) >> > >> >> >>>>> >> > >> >> >>>>> [2011-11-16 12:36:22] INFO >> > >> >> >>>>> (SpillableMemoryManager.java:167) - first memory handler >> > >> >> >>>>> call - Collection threshold init = >> > >> >> >>>>> 175308800(171200K) used = >> > >> >> >>>>> 496500704(484863K) committed = 524288000(512000K) max = >> > >> >> >>>>> 524288000(512000K) >> > >> >> >>>>> >> > >> >> >>>>> [2011-11-16 12:37:28] INFO (TaskLogsTruncater.java:69) >> > >> >> >>>>> - Initializing logs' >> > >> >> >>>>> truncater with mapRetainSize=-1 and reduceRetainSize=-1 >> > >> >> >>>>> >> > >> >> >>>>> [2011-11-16 12:37:28] FATAL (Child.java:318) - Error >> > >> >> >>>>> running >> > >> child : >> > >> >> >>>>> java.lang.OutOfMemoryError: Java heap space >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> java.util.Arrays.copyOfRange(Arrays.java:3209) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> java.lang.String.<init>(String.java:215) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:644 >> > >> >> >>>>> ) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:547 >> > >> >> >>>>> ) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.readCharArray(BinInter >> > >> >> >>>>> Sede >> > >> >> >>>>> s.java >> > >> >> >>>>> :210) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterSede >> > >> >> >>>>> s.ja >> > >> >> >>>>> va:333 >> > >> >> >>>>> ) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterSede >> > >> >> >>>>> s.ja >> > >> >> >>>>> va:251 >> > >> >> >>>>> ) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.addColsToTuple(BinInte >> > >> >> >>>>> rSed >> > >> >> >>>>> es.jav >> > >> >> >>>>> a:555) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.data.BinSedesTuple.readFields(BinSedesTup >> > >> >> >>>>> le.j >> > >> >> >>>>> ava:64 >> > >> >> >>>>> ) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.data.InternalCachedBag$CachedBagIterator. >> > >> >> >>>>> hasN >> > >> >> >>>>> ext(In >> > >> >> >>>>> ternalCach >> > >> >> >>>>> edBag.java:237) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.builtin.TOP.updateTop(TOP.java:139) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.builtin.TOP.exec(TOP.java:116) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.builtin.TOP.exec(TOP.java:65) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer. >> > >> >> >>>>> expres >> > >> >> >>>>> sionOperat >> > >> >> >>>>> ors.POUserFunc.getNext(POUserFunc.java:245) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer. >> > >> >> >>>>> expres >> > >> >> >>>>> sionOperat >> > >> >> >>>>> ors.POUserFunc.getNext(POUserFunc.java:287) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer. >> > >> >> >>>>> relati >> > >> >> >>>>> onalOperat >> > >> >> >>>>> ors.POForEach.processPlan(POForEach.java:338) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer. >> > >> >> >>>>> relati >> > >> >> >>>>> onalOperat >> > >> >> >>>>> ors.POForEach.getNext(POForEach.java:290) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer. >> > >> >> >>>>> Physic >> > >> >> >>>>> alOperator >> > >> >> >>>>> .processInput(PhysicalOperator.java:276) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer. >> > >> >> >>>>> relati >> > >> >> >>>>> onalOperat >> > >> >> >>>>> ors.POForEach.getNext(POForEach.java:240) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceL >> > >> >> >>>>> ayer >> > >> >> >>>>> .PigMa >> > >> >> >>>>> pReduce$Re >> > >> >> >>>>> duce.runPipeline(PigMapReduce.java:434) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceL >> > >> >> >>>>> ayer >> > >> >> >>>>> .PigMa >> > >> >> >>>>> pReduce$Re >> > >> >> >>>>> duce.processOnePackageOutput(PigMapReduce.java:402) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceL >> > >> >> >>>>> ayer >> > >> >> >>>>> .PigMa >> > >> >> >>>>> pReduce$Re >> > >> >> >>>>> duce.reduce(PigMapReduce.java:382) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceL >> > >> >> >>>>> ayer >> > >> >> >>>>> .PigMa >> > >> >> >>>>> pReduce$Re >> > >> >> >>>>> duce.reduce(PigMapReduce.java:251) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176 >> > >> >> >>>>> ) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> >> > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java: >> > >> >> >>>>> 572) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java: >> > >> >> >>>>> 414) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.hadoop.mapred.Child$4.run(Child.java:270) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> java.security.AccessController.doPrivileged(Native >> > >> >> >>>>> Method) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> javax.security.auth.Subject.doAs(Subject.java:396) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.hadoop.security.UserGroupInformation.doAs(Use >> > >> >> >>>>> rGro >> > >> >> >>>>> upInfo >> > >> >> >>>>> rmation.ja >> > >> >> >>>>> va:1127) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> org.apache.hadoop.mapred.Child.main(Child.java:264) >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> stderr logs >> > >> >> >>>>> >> > >> >> >>>>> Exception in thread "Low Memory Detector" >> > >> >> >>>>> java.lang.OutOfMemoryError: Java heap space >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> sun.management.MemoryNotifInfoCompositeData.getComposite >> > >> >> >>>>> Data >> > >> >> >>>>> (Memor >> > >> >> >>>>> yNotifInfo >> > >> >> >>>>> CompositeData.java:42) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> sun.management.MemoryNotifInfoCompositeData.toCompositeD >> > >> >> >>>>> ata( >> > >> >> >>>>> Memory >> > >> >> >>>>> NotifInfoC >> > >> >> >>>>> ompositeData.java:36) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> sun.management.MemoryImpl.createNotification(MemoryImpl. >> > >> >> >>>>> java >> > >> >> >>>>> :168) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> >> > >> >> >> > >> >> >> > >> >> > >> sun.management.MemoryPoolImpl$CollectionSensor.triggerAction(MemoryPoolImpl. >> > >> >> >>>>> java:300) >> > >> >> >>>>> >> > >> >> >>>>> at >> > >> >> >>>>> sun.management.Sensor.trigger(Sensor.java:120) >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> >> > >> >> >>>>> Thanks in advance! >> > >> >> >>>>> >> > >> >> >>>> >> > >> >> >>> >> > >> >> >> >> > >> >> > >> > >> >> > >> > >> >> > >> > >> >> > -- >> > >> >> > Best Regards, >> > >> >> > Ruslan Al-Fakikh >> > >> >> > >> > >> >> >> > >> >> >> > >> > >> > >> >> > > >> > >> > >> > >
