Haitao, Is your query using combiner ? Can you send the explain plan output ? Does the heap information say how many entries are there in the InteralCachedBag's ArrayList ? What version of pig are you using ?
Thanks, Thejas On 7/10/12 11:50 PM, Haitao Yao wrote: > Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because > every time the InternalCachedBag spills, It creates a new tmp file in > java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple > added into InternalCachedBag will create a new tmp file. And the tmp > file is deleted on exit. > So , if you're unlucky like me, you will get a OOM Exception caused by > java.io.DeleteOnExitHook! > Here's the evidence: > > God, we really need a full description of how every parameter works. > > > > Haitao Yao > [email protected] <mailto:[email protected]> > weibo: @haitao_yao > Skype: haitao.yao.final > > 在 2012-7-10,下午4:20, Haitao Yao 写道: > >> I found the solution. >> >> After analyzing the heap dump while the reducer OOM, I found out the >> memory is consumed by org.apache.pig.data.InternalCachedBag , here's >> the diagram: >> <cc.jpg> >> >> In the source code of org.apache.pig.data.InternalCachedBag, I found >> out there's a parameter for the cache limit: >> *public* InternalCachedBag(*int* bagCount) { >> *float* percent = 0.2F; >> >> *if* (PigMapReduce./sJobConfInternal/.get() != *null*) { >> // here, the cache limit is from here! >> String usage = >> PigMapReduce./sJobConfInternal/.get().get("pig.cachedbag.memusage"); >> *if* (usage != *null*) { >> percent = Float./parseFloat/(usage); >> } >> } >> >> init(bagCount, percent); >> } >> *private* *void* init(*int* bagCount, *float* percent) { >> factory = TupleFactory./getInstance/(); >> mContents = *new* ArrayList<Tuple>(); >> >> *long* max = Runtime./getRuntime/().maxMemory(); >> maxMemUsage = (*long*)(((*float*)max * percent) / (*float*)bagCount); >> cacheLimit = Integer./MAX_VALUE/; >> >> // set limit to 0, if memusage is 0 or really really small. >> // then all tuples are put into disk >> *if* (maxMemUsage < 1) { >> cacheLimit = 0; >> } >> /log/.warn("cacheLimit: " + *this*.cacheLimit); >> addDone = *false*; >> } >> >> so, after write pig.cachedbag.memusage=0 into >> $PIG_HOME/conf/pig.properties, my job successes! >> >> You can also set to an appropriate value to fully utilize your memory >> as a cache. >> >> Hope this is useful for others. >> Thanks. >> >> >> Haitao Yao >> [email protected] <mailto:[email protected]> >> weibo: @haitao_yao >> Skype: haitao.yao.final >> >> 在 2012-7-10,下午1:06, Haitao Yao 写道: >> >>> my reducers get 512 MB, -Xms512M -Xmx512M. >>> The reducer does not get OOM when manually invoke spill in my case. >>> >>> Can you explain more about your solution? >>> And can your solution fit into 512MB reducer process? >>> Thanks very much. >>> >>> >>> >>> Haitao Yao >>> [email protected] <mailto:[email protected]> >>> weibo: @haitao_yao >>> Skype: haitao.yao.final >>> >>> 在 2012-7-10,下午12:26, Jonathan Coveney 写道: >>> >>>> I have something in the mix that should reduce bag memory :) >>>> Question: how >>>> much memory are your reducers getting? In my experience, you'll get >>>> OOM's >>>> on spilling if you have allocated less than a gig to the JVM >>>> >>>> 2012/7/9 Haitao Yao <[email protected] <mailto:[email protected]>> >>>> >>>>> I have encountered the similar problem. And I got a OOM while >>>>> running the >>>>> reducer. >>>>> I think the reason is the data bag generated after group all is too >>>>> big to >>>>> fit into the reducer's memory. >>>>> >>>>> and I have written a new COUNT implementation with explicit invoke >>>>> System.gc() and spill after the COUNT function finish its job, but it >>>>> still get OOM >>>>> >>>>> here's the code of the new COUNT implementation: >>>>> @Override >>>>> public Long exec(Tuple input) throws IOException { >>>>> DataBag bag = (DataBag)input.get(0); >>>>> Long result = super.exec(input); >>>>> LOG.warn(" before spill data bag memory : " + >>>>> Runtime.getRuntime().freeMemory()); >>>>> bag.spill(); >>>>> System.gc(); >>>>> LOG.warn(" after spill data bag memory : " + >>>>> Runtime.getRuntime().freeMemory()); >>>>> LOG.warn("big bag size: " + bag.size() + ", >>>>> hashcode: " + >>>>> bag.hashCode()); >>>>> return result; >>>>> } >>>>> >>>>> >>>>> I think we have to redesign the data bag implementation with less >>>>> memory >>>>> consumed. >>>>> >>>>> >>>>> >>>>> Haitao Yao >>>>> [email protected] <mailto:[email protected]> >>>>> weibo: @haitao_yao >>>>> Skype: haitao.yao.final >>>>> >>>>> 在 2012-7-10,上午6:54, Sheng Guo 写道: >>>>> >>>>>> the pig script: >>>>>> >>>>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage(); >>>>>> >>>>>> grpall = group longDesc all; >>>>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber; >>>>>> explain cnt; >>>>>> >>>>>> >>>>>> the dump relation result: >>>>>> >>>>>> #----------------------------------------------- >>>>>> # New Logical Plan: >>>>>> #----------------------------------------------- >>>>>> cnt: (Name: LOStore Schema: allNumber#65:long) >>>>>> | >>>>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long) >>>>>> | | >>>>>> | (Name: LOGenerate[false] Schema: >>>>>> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65] >>>>>> | | | >>>>>> | | (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long >>>>>> Uid: >>>>>> 65) >>>>>> | | | >>>>>> | | |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 >>>>>> Column: >>>>>> (*)) >>>>>> | | >>>>>> | |---longDesc: (Name: LOInnerLoad[1] Schema: >>>>>> >>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray) >>>>>> | >>>>>> |---grpall: (Name: LOCogroup Schema: >>>>>> >>>>> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)}) >>>>>> | | >>>>>> | (Name: Constant Type: chararray Uid: 62) >>>>>> | >>>>>> |---longDesc: (Name: LOLoad Schema: >>>>>> >>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null >>>>>> >>>>>> #----------------------------------------------- >>>>>> # Physical Plan: >>>>>> #----------------------------------------------- >>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9 >>>>>> | >>>>>> |---cnt: New For Each(false)[bag] - scope-8 >>>>>> | | >>>>>> | POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6 >>>>>> | | >>>>>> | |---Project[bag][1] - scope-5 >>>>>> | >>>>>> |---grpall: Package[tuple]{chararray} - scope-2 >>>>>> | >>>>>> |---grpall: Global Rearrange[tuple] - scope-1 >>>>>> | >>>>>> |---grpall: Local Rearrange[tuple]{chararray}(false) - >>>>>> scope-3 >>>>>> | | >>>>>> | Constant(all) - scope-4 >>>>>> | >>>>>> |---longDesc: >>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0 >>>>>> >>>>>> 2012-07-09 15:47:02,441 [main] INFO >>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler >>>>>> - >>>>>> File concatenation threshold: 100 optimistic? false >>>>>> 2012-07-09 15:47:02,448 [main] INFO >>>>>> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer >>>>>> - Choosing to move algebraic foreach to combiner >>>>>> 2012-07-09 15:47:02,581 [main] INFO >>>>>> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer >>>>>> - MR plan size before optimization: 1 >>>>>> 2012-07-09 15:47:02,581 [main] INFO >>>>>> >>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer >>>>>> - MR plan size after optimization: 1 >>>>>> #-------------------------------------------------- >>>>>> # Map Reduce Plan >>>>>> #-------------------------------------------------- >>>>>> MapReduce node scope-10 >>>>>> Map Plan >>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22 >>>>>> | | >>>>>> | Project[chararray][0] - scope-23 >>>>>> | >>>>>> |---cnt: New For Each(false,false)[bag] - scope-11 >>>>>> | | >>>>>> | Project[chararray][0] - scope-12 >>>>>> | | >>>>>> | POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - >>>>>> scope-13 >>>>>> | | >>>>>> | |---Project[bag][1] - scope-14 >>>>>> | >>>>>> |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24 >>>>>> | >>>>>> |---longDesc: >>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - >>>>>> scope-0-------- >>>>>> Combine Plan >>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26 >>>>>> | | >>>>>> | Project[chararray][0] - scope-27 >>>>>> | >>>>>> |---cnt: New For Each(false,false)[bag] - scope-15 >>>>>> | | >>>>>> | Project[chararray][0] - scope-16 >>>>>> | | >>>>>> | POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - >>>>>> scope-17 >>>>>> | | >>>>>> | |---Project[bag][1] - scope-18 >>>>>> | >>>>>> |---POCombinerPackage[tuple]{chararray} - scope-20-------- >>>>>> Reduce Plan >>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9 >>>>>> | >>>>>> |---cnt: New For Each(false)[bag] - scope-8 >>>>>> | | >>>>>> | POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6 >>>>>> | | >>>>>> | |---Project[bag][1] - scope-19 >>>>>> | >>>>>> |---POCombinerPackage[tuple]{chararray} - scope-28-------- >>>>>> Global sort: false >>>>>> ---------------- >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney >>>>>> <[email protected] <mailto:[email protected]>> >>>>> wrote: >>>>>> >>>>>>> instead of doing "dump relation," do "explain relation" (then run >>>>>>> identically) and paste the output here. It will show whether the >>>>> combiner >>>>>>> is being used, >>>>>>> >>>>>>> 2012/7/3 Ruslan Al-Fakikh <[email protected] >>>>>>> <mailto:[email protected]>> >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> As it was said, COUNT is algebraic and should be fast, because it >>>>>>>> forces combiner. You should make sure that combiner is really used >>>>>>>> here. It can be disabled in some situations. I've encountered such >>>>>>>> situations many times when a job is tooo heavy in case no >>>>>>>> combiner is >>>>>>>> applied. >>>>>>>> >>>>>>>> Ruslan >>>>>>>> >>>>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S >>>>>>>> <[email protected] <mailto:[email protected]>> >>>>>>> wrote: >>>>>>>>> Right!! >>>>>>>>> >>>>>>>>> Since it is mentioned that job is hanging, wild guess is it must be >>>>>>>>> 'group all'. How can that be confirmed? >>>>>>>>> >>>>>>>>> On 7/3/12, Jonathan Coveney <[email protected] >>>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>>> group all uses a single reducer, but COUNT is algebraic, and >>>>>>>>>> as such, >>>>>>>> will >>>>>>>>>> use combiners, so it is generally quite fast. >>>>>>>>>> >>>>>>>>>> 2012/7/2 Subir S <[email protected] >>>>>>>>>> <mailto:[email protected]>> >>>>>>>>>> >>>>>>>>>>> Group all - uses single reducer AFAIU. You can try to count per >>>>> group >>>>>>>>>>> and sum may be. >>>>>>>>>>> >>>>>>>>>>> You may also try with COUNT_STAR to include NULL fields. >>>>>>>>>>> >>>>>>>>>>> On 7/3/12, Sheng Guo <[email protected] >>>>>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>>>>> Hi all, >>>>>>>>>>>> >>>>>>>>>>>> I used to use the following pig script to do the counting of the >>>>>>>>>>>> records. >>>>>>>>>>>> >>>>>>>>>>>> m_skill_group = group m_skills_filter by member_id; >>>>>>>>>>>> grpd = group m_skill_group all; >>>>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group); >>>>>>>>>>>> >>>>>>>>>>>> cnt_filter = limit cnt 10; >>>>>>>>>>>> dump cnt_filter; >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> but sometimes, when the records get larger, it takes lots of >>>>>>>>>>>> time >>>>>>> and >>>>>>>>>>> hang >>>>>>>>>>>> up, and or die. >>>>>>>>>>>> I thought counting should be simple enough, so what is the >>>>>>>>>>>> best way >>>>>>>> to >>>>>>>>>>> do a >>>>>>>>>>>> counting in pig? >>>>>>>>>>>> >>>>>>>>>>>> Thanks! >>>>>>>>>>>> >>>>>>>>>>>> Sheng >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best Regards, >>>>>>>> Ruslan Al-Fakikh >>>>>>>> >>>>>>> >>>>> >>>>> >>> >> >
