Sorry , I sent the mail only to Thejas. Resend it for all.
Haitao Yao [email protected] weibo: @haitao_yao Skype: haitao.yao.final 在 2012-7-12,上午10:41, Haitao Yao 写道: > > > > Is your query using combiner ? > I did know how to explicitly use combiner. > > > Can you send the explain plan output ? > The explain result is in the attachment. It's a little long. > > <aa.explain> > > > Does the heap information say how many entries are there in the > InteralCachedBag's ArrayList ? > There's 6 big Array lists, and the size is about 372692 > Here's the screen snapshot of the heap dump: > > screen snapshot 1: you can see there's 6 big POForeEach instances > > <aa.jpg> > > screen snapshot 2: you can see the memory are mostly retained > by the big array list. > > <bb.jpg> > > screen snapshot 3: you can see the big array list is referenced > by InternalCachedBag: > > <cc.jpg> > > > What version of pig are you using? > pig-0.9.2, I've read the latest source code of pig from github, and I > don't find any improvements on IntercalCachedBag. > > > Haitao Yao > [email protected] > weibo: @haitao_yao > Skype: haitao.yao.final > > 在 2012-7-12,上午8:56, Thejas Nair 写道: > >> Haitao, >> Is your query using combiner ? Can you send the explain plan output ? >> Does the heap information say how many entries are there in the >> InteralCachedBag's ArrayList ? >> What version of pig are you using ? >> >> >> Thanks, >> Thejas >> >> >> On 7/10/12 11:50 PM, Haitao Yao wrote: >>> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because >>> every time the InternalCachedBag spills, It creates a new tmp file in >>> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple >>> added into InternalCachedBag will create a new tmp file. And the tmp >>> file is deleted on exit. >>> So , if you're unlucky like me, you will get a OOM Exception caused by >>> java.io.DeleteOnExitHook! >>> Here's the evidence: >>> >>> God, we really need a full description of how every parameter works. >>> >>> >>> >>> Haitao Yao >>> [email protected] <mailto:[email protected]> >>> weibo: @haitao_yao >>> Skype: haitao.yao.final >>> >>> 在 2012-7-10,下午4:20, Haitao Yao 写道: >>> >>>> I found the solution. >>>> >>>> After analyzing the heap dump while the reducer OOM, I found out the >>>> memory is consumed by org.apache.pig.data.InternalCachedBag , here's >>>> the diagram: >>>> <cc.jpg> >>>> >>>> In the source code of org.apache.pig.data.InternalCachedBag, I found >>>> out there's a parameter for the cache limit: >>>> *public* InternalCachedBag(*int* bagCount) { >>>> *float* percent = 0.2F; >>>> >>>> *if* (PigMapReduce./sJobConfInternal/.get() != *null*) { >>>> // here, the cache limit is from here! >>>> String usage = >>>> PigMapReduce./sJobConfInternal/.get().get("pig.cachedbag.memusage"); >>>> *if* (usage != *null*) { >>>> percent = Float./parseFloat/(usage); >>>> } >>>> } >>>> >>>> init(bagCount, percent); >>>> } >>>> *private* *void* init(*int* bagCount, *float* percent) { >>>> factory = TupleFactory./getInstance/(); >>>> mContents = *new* ArrayList<Tuple>(); >>>> >>>> *long* max = Runtime./getRuntime/().maxMemory(); >>>> maxMemUsage = (*long*)(((*float*)max * percent) / (*float*)bagCount); >>>> cacheLimit = Integer./MAX_VALUE/; >>>> >>>> // set limit to 0, if memusage is 0 or really really small. >>>> // then all tuples are put into disk >>>> *if* (maxMemUsage < 1) { >>>> cacheLimit = 0; >>>> } >>>> /log/.warn("cacheLimit: " + *this*.cacheLimit); >>>> addDone = *false*; >>>> } >>>> >>>> so, after write pig.cachedbag.memusage=0 into >>>> $PIG_HOME/conf/pig.properties, my job successes! >>>> >>>> You can also set to an appropriate value to fully utilize your memory >>>> as a cache. >>>> >>>> Hope this is useful for others. >>>> Thanks. >>>> >>>> >>>> Haitao Yao >>>> [email protected] <mailto:[email protected]> >>>> weibo: @haitao_yao >>>> Skype: haitao.yao.final >>>> >>>> 在 2012-7-10,下午1:06, Haitao Yao 写道: >>>> >>>>> my reducers get 512 MB, -Xms512M -Xmx512M. >>>>> The reducer does not get OOM when manually invoke spill in my case. >>>>> >>>>> Can you explain more about your solution? >>>>> And can your solution fit into 512MB reducer process? >>>>> Thanks very much. >>>>> >>>>> >>>>> >>>>> Haitao Yao >>>>> [email protected] <mailto:[email protected]> >>>>> weibo: @haitao_yao >>>>> Skype: haitao.yao.final >>>>> >>>>> 在 2012-7-10,下午12:26, Jonathan Coveney 写道: >>>>> >>>>>> I have something in the mix that should reduce bag memory :) >>>>>> Question: how >>>>>> much memory are your reducers getting? In my experience, you'll get >>>>>> OOM's >>>>>> on spilling if you have allocated less than a gig to the JVM >>>>>> >>>>>> 2012/7/9 Haitao Yao <[email protected] <mailto:[email protected]>> >>>>>> >>>>>>> I have encountered the similar problem. And I got a OOM while >>>>>>> running the >>>>>>> reducer. >>>>>>> I think the reason is the data bag generated after group all is too >>>>>>> big to >>>>>>> fit into the reducer's memory. >>>>>>> >>>>>>> and I have written a new COUNT implementation with explicit invoke >>>>>>> System.gc() and spill after the COUNT function finish its job, but it >>>>>>> still get OOM >>>>>>> >>>>>>> here's the code of the new COUNT implementation: >>>>>>> @Override >>>>>>> public Long exec(Tuple input) throws IOException { >>>>>>> DataBag bag = (DataBag)input.get(0); >>>>>>> Long result = super.exec(input); >>>>>>> LOG.warn(" before spill data bag memory : " + >>>>>>> Runtime.getRuntime().freeMemory()); >>>>>>> bag.spill(); >>>>>>> System.gc(); >>>>>>> LOG.warn(" after spill data bag memory : " + >>>>>>> Runtime.getRuntime().freeMemory()); >>>>>>> LOG.warn("big bag size: " + bag.size() + ", >>>>>>> hashcode: " + >>>>>>> bag.hashCode()); >>>>>>> return result; >>>>>>> } >>>>>>> >>>>>>> >>>>>>> I think we have to redesign the data bag implementation with less >>>>>>> memory >>>>>>> consumed. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Haitao Yao >>>>>>> [email protected] <mailto:[email protected]> >>>>>>> weibo: @haitao_yao >>>>>>> Skype: haitao.yao.final >>>>>>> >>>>>>> 在 2012-7-10,上午6:54, Sheng Guo 写道: >>>>>>> >>>>>>>> the pig script: >>>>>>>> >>>>>>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage(); >>>>>>>> >>>>>>>> grpall = group longDesc all; >>>>>>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber; >>>>>>>> explain cnt; >>>>>>>> >>>>>>>> >>>>>>>> the dump relation result: >>>>>>>> >>>>>>>> #----------------------------------------------- >>>>>>>> # New Logical Plan: >>>>>>>> #----------------------------------------------- >>>>>>>> cnt: (Name: LOStore Schema: allNumber#65:long) >>>>>>>> | >>>>>>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long) >>>>>>>> | | >>>>>>>> | (Name: LOGenerate[false] Schema: >>>>>>>> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65] >>>>>>>> | | | >>>>>>>> | | (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long >>>>>>>> Uid: >>>>>>>> 65) >>>>>>>> | | | >>>>>>>> | | |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 >>>>>>>> Column: >>>>>>>> (*)) >>>>>>>> | | >>>>>>>> | |---longDesc: (Name: LOInnerLoad[1] Schema: >>>>>>>> >>>>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray) >>>>>>>> | >>>>>>>> |---grpall: (Name: LOCogroup Schema: >>>>>>>> >>>>>>> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)}) >>>>>>>> | | >>>>>>>> | (Name: Constant Type: chararray Uid: 62) >>>>>>>> | >>>>>>>> |---longDesc: (Name: LOLoad Schema: >>>>>>>> >>>>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null >>>>>>>> >>>>>>>> #----------------------------------------------- >>>>>>>> # Physical Plan: >>>>>>>> #----------------------------------------------- >>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9 >>>>>>>> | >>>>>>>> |---cnt: New For Each(false)[bag] - scope-8 >>>>>>>> | | >>>>>>>> | POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6 >>>>>>>> | | >>>>>>>> | |---Project[bag][1] - scope-5 >>>>>>>> | >>>>>>>> |---grpall: Package[tuple]{chararray} - scope-2 >>>>>>>> | >>>>>>>> |---grpall: Global Rearrange[tuple] - scope-1 >>>>>>>> | >>>>>>>> |---grpall: Local Rearrange[tuple]{chararray}(false) - >>>>>>>> scope-3 >>>>>>>> | | >>>>>>>> | Constant(all) - scope-4 >>>>>>>> | >>>>>>>> |---longDesc: >>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0 >>>>>>>> >>>>>>>> 2012-07-09 15:47:02,441 [main] INFO >>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler >>>>>>>> >>>>>>>> - >>>>>>>> File concatenation threshold: 100 optimistic? false >>>>>>>> 2012-07-09 15:47:02,448 [main] INFO >>>>>>>> >>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer >>>>>>>> - Choosing to move algebraic foreach to combiner >>>>>>>> 2012-07-09 15:47:02,581 [main] INFO >>>>>>>> >>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer >>>>>>>> - MR plan size before optimization: 1 >>>>>>>> 2012-07-09 15:47:02,581 [main] INFO >>>>>>>> >>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer >>>>>>>> - MR plan size after optimization: 1 >>>>>>>> #-------------------------------------------------- >>>>>>>> # Map Reduce Plan >>>>>>>> #-------------------------------------------------- >>>>>>>> MapReduce node scope-10 >>>>>>>> Map Plan >>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22 >>>>>>>> | | >>>>>>>> | Project[chararray][0] - scope-23 >>>>>>>> | >>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-11 >>>>>>>> | | >>>>>>>> | Project[chararray][0] - scope-12 >>>>>>>> | | >>>>>>>> | POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - >>>>>>>> scope-13 >>>>>>>> | | >>>>>>>> | |---Project[bag][1] - scope-14 >>>>>>>> | >>>>>>>> |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24 >>>>>>>> | >>>>>>>> |---longDesc: >>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - >>>>>>>> scope-0-------- >>>>>>>> Combine Plan >>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26 >>>>>>>> | | >>>>>>>> | Project[chararray][0] - scope-27 >>>>>>>> | >>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-15 >>>>>>>> | | >>>>>>>> | Project[chararray][0] - scope-16 >>>>>>>> | | >>>>>>>> | POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - >>>>>>>> scope-17 >>>>>>>> | | >>>>>>>> | |---Project[bag][1] - scope-18 >>>>>>>> | >>>>>>>> |---POCombinerPackage[tuple]{chararray} - scope-20-------- >>>>>>>> Reduce Plan >>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9 >>>>>>>> | >>>>>>>> |---cnt: New For Each(false)[bag] - scope-8 >>>>>>>> | | >>>>>>>> | POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6 >>>>>>>> | | >>>>>>>> | |---Project[bag][1] - scope-19 >>>>>>>> | >>>>>>>> |---POCombinerPackage[tuple]{chararray} - scope-28-------- >>>>>>>> Global sort: false >>>>>>>> ---------------- >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney >>>>>>>> <[email protected] <mailto:[email protected]>> >>>>>>> wrote: >>>>>>>> >>>>>>>>> instead of doing "dump relation," do "explain relation" (then run >>>>>>>>> identically) and paste the output here. It will show whether the >>>>>>> combiner >>>>>>>>> is being used, >>>>>>>>> >>>>>>>>> 2012/7/3 Ruslan Al-Fakikh <[email protected] >>>>>>>>> <mailto:[email protected]>> >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> As it was said, COUNT is algebraic and should be fast, because it >>>>>>>>>> forces combiner. You should make sure that combiner is really used >>>>>>>>>> here. It can be disabled in some situations. I've encountered such >>>>>>>>>> situations many times when a job is tooo heavy in case no >>>>>>>>>> combiner is >>>>>>>>>> applied. >>>>>>>>>> >>>>>>>>>> Ruslan >>>>>>>>>> >>>>>>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S >>>>>>>>>> <[email protected] <mailto:[email protected]>> >>>>>>>>> wrote: >>>>>>>>>>> Right!! >>>>>>>>>>> >>>>>>>>>>> Since it is mentioned that job is hanging, wild guess is it must be >>>>>>>>>>> 'group all'. How can that be confirmed? >>>>>>>>>>> >>>>>>>>>>> On 7/3/12, Jonathan Coveney <[email protected] >>>>>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>>>>> group all uses a single reducer, but COUNT is algebraic, and >>>>>>>>>>>> as such, >>>>>>>>>> will >>>>>>>>>>>> use combiners, so it is generally quite fast. >>>>>>>>>>>> >>>>>>>>>>>> 2012/7/2 Subir S <[email protected] >>>>>>>>>>>> <mailto:[email protected]>> >>>>>>>>>>>> >>>>>>>>>>>>> Group all - uses single reducer AFAIU. You can try to count per >>>>>>> group >>>>>>>>>>>>> and sum may be. >>>>>>>>>>>>> >>>>>>>>>>>>> You may also try with COUNT_STAR to include NULL fields. >>>>>>>>>>>>> >>>>>>>>>>>>> On 7/3/12, Sheng Guo <[email protected] >>>>>>>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I used to use the following pig script to do the counting of the >>>>>>>>>>>>>> records. >>>>>>>>>>>>>> >>>>>>>>>>>>>> m_skill_group = group m_skills_filter by member_id; >>>>>>>>>>>>>> grpd = group m_skill_group all; >>>>>>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group); >>>>>>>>>>>>>> >>>>>>>>>>>>>> cnt_filter = limit cnt 10; >>>>>>>>>>>>>> dump cnt_filter; >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> but sometimes, when the records get larger, it takes lots of >>>>>>>>>>>>>> time >>>>>>>>> and >>>>>>>>>>>>> hang >>>>>>>>>>>>>> up, and or die. >>>>>>>>>>>>>> I thought counting should be simple enough, so what is the >>>>>>>>>>>>>> best way >>>>>>>>>> to >>>>>>>>>>>>> do a >>>>>>>>>>>>>> counting in pig? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Sheng >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best Regards, >>>>>>>>>> Ruslan Al-Fakikh >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>> >>>> >>> >> >> >
