Sorry , I sent the mail only to Thejas. 

Resend it for all.


Haitao Yao
[email protected]
weibo: @haitao_yao
Skype:  haitao.yao.final

在 2012-7-12,上午10:41, Haitao Yao 写道:

> 
> 
> > Is your query using combiner ?
>       I did know how to explicitly use combiner.
> 
> > Can you send the explain plan output ?
>       The explain result is in the attachment. It's a little long. 
>       
> <aa.explain>
> 
> > Does the heap information say how many entries are there in the
> InteralCachedBag's ArrayList ?
>       There's 6 big Array lists, and the size is about 372692
>       Here's the screen snapshot of the heap dump:
> 
>       screen snapshot 1: you can see there's 6 big POForeEach instances
>       
> <aa.jpg>
> 
>               screen snapshot 2: you can see the memory are mostly retained 
> by the big array list.
>               
> <bb.jpg>
> 
>               screen snapshot 3: you can see the big array list is referenced 
> by InternalCachedBag: 
>                       
> <cc.jpg>
>       
> > What version of pig are you using?
>       pig-0.9.2, I've read the latest source code of pig from github, and I 
> don't find any improvements on IntercalCachedBag.
> 
> 
> Haitao Yao
> [email protected]
> weibo: @haitao_yao
> Skype:  haitao.yao.final
> 
> 在 2012-7-12,上午8:56, Thejas Nair 写道:
> 
>> Haitao,
>> Is your query using combiner ? Can you send the explain plan output ?
>> Does the heap information say how many entries are there in the
>> InteralCachedBag's ArrayList ?
>> What version of pig are you using ?
>> 
>> 
>> Thanks,
>> Thejas
>> 
>> 
>> On 7/10/12 11:50 PM, Haitao Yao wrote:
>>> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because 
>>> every time the InternalCachedBag spills, It creates a new tmp file in 
>>> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple 
>>> added into InternalCachedBag will create a new tmp file. And the tmp 
>>> file is deleted on exit.
>>> So , if you're unlucky like me, you will get a OOM Exception caused by 
>>> java.io.DeleteOnExitHook!
>>> Here's the evidence:
>>> 
>>> God, we really need a full description of how every parameter works.
>>> 
>>> 
>>> 
>>> Haitao Yao
>>> [email protected] <mailto:[email protected]>
>>> weibo: @haitao_yao
>>> Skype:  haitao.yao.final
>>> 
>>> 在 2012-7-10,下午4:20, Haitao Yao 写道:
>>> 
>>>> I found the solution.
>>>> 
>>>> After analyzing the heap dump while the reducer OOM, I found out the 
>>>> memory is consumed by org.apache.pig.data.InternalCachedBag , here's 
>>>> the diagram:
>>>> <cc.jpg>
>>>> 
>>>> In the source code of org.apache.pig.data.InternalCachedBag, I found 
>>>> out there's a parameter for the cache limit:
>>>> *public* InternalCachedBag(*int* bagCount) {
>>>> *float* percent = 0.2F;
>>>> 
>>>> *if* (PigMapReduce./sJobConfInternal/.get() != *null*) {
>>>> // here, the cache limit is from here!
>>>> String usage = 
>>>> PigMapReduce./sJobConfInternal/.get().get("pig.cachedbag.memusage");
>>>> *if* (usage != *null*) {
>>>> percent = Float./parseFloat/(usage);
>>>> }
>>>> }
>>>> 
>>>>        init(bagCount, percent);
>>>>    }
>>>> *private* *void* init(*int* bagCount, *float* percent) {
>>>> factory = TupleFactory./getInstance/();
>>>> mContents = *new* ArrayList<Tuple>();
>>>> 
>>>> *long* max = Runtime./getRuntime/().maxMemory();
>>>> maxMemUsage = (*long*)(((*float*)max * percent) / (*float*)bagCount);
>>>> cacheLimit = Integer./MAX_VALUE/;
>>>> 
>>>> // set limit to 0, if memusage is 0 or really really small.
>>>> // then all tuples are put into disk
>>>> *if* (maxMemUsage < 1) {
>>>> cacheLimit = 0;
>>>>        }
>>>> /log/.warn("cacheLimit: " + *this*.cacheLimit);
>>>> addDone = *false*;
>>>>    }
>>>> 
>>>> so, after write pig.cachedbag.memusage=0 into 
>>>> $PIG_HOME/conf/pig.properties, my job successes!
>>>> 
>>>> You can also set to an appropriate value to fully utilize your memory 
>>>> as a cache.
>>>> 
>>>> Hope this is useful for others.
>>>> Thanks.
>>>> 
>>>> 
>>>> Haitao Yao
>>>> [email protected] <mailto:[email protected]>
>>>> weibo: @haitao_yao
>>>> Skype:  haitao.yao.final
>>>> 
>>>> 在 2012-7-10,下午1:06, Haitao Yao 写道:
>>>> 
>>>>> my reducers get 512 MB, -Xms512M -Xmx512M.
>>>>> The reducer does not get OOM when manually invoke spill in my case.
>>>>> 
>>>>> Can you explain more about your solution?
>>>>> And can your solution fit into 512MB reducer process?
>>>>> Thanks very much.
>>>>> 
>>>>> 
>>>>> 
>>>>> Haitao Yao
>>>>> [email protected] <mailto:[email protected]>
>>>>> weibo: @haitao_yao
>>>>> Skype:  haitao.yao.final
>>>>> 
>>>>> 在 2012-7-10,下午12:26, Jonathan Coveney 写道:
>>>>> 
>>>>>> I have something in the mix that should reduce bag memory :) 
>>>>>> Question: how
>>>>>> much memory are your reducers getting? In my experience, you'll get 
>>>>>> OOM's
>>>>>> on spilling if you have allocated less than a gig to the JVM
>>>>>> 
>>>>>> 2012/7/9 Haitao Yao <[email protected] <mailto:[email protected]>>
>>>>>> 
>>>>>>> I have encountered the similar problem.  And I got a OOM while 
>>>>>>> running the
>>>>>>> reducer.
>>>>>>> I think the reason is the data bag generated after group all is too 
>>>>>>> big to
>>>>>>> fit into the reducer's memory.
>>>>>>> 
>>>>>>> and I have written a new COUNT implementation with explicit invoke
>>>>>>> System.gc() and spill  after the COUNT function finish its job, but it
>>>>>>> still get OOM
>>>>>>> 
>>>>>>> here's the code of the new COUNT implementation:
>>>>>>>       @Override
>>>>>>>       public Long exec(Tuple input) throws IOException {
>>>>>>>               DataBag bag = (DataBag)input.get(0);
>>>>>>>               Long result = super.exec(input);
>>>>>>>               LOG.warn(" before spill data bag memory : " +
>>>>>>> Runtime.getRuntime().freeMemory());
>>>>>>>               bag.spill();
>>>>>>>               System.gc();
>>>>>>>               LOG.warn(" after spill data bag memory : " +
>>>>>>> Runtime.getRuntime().freeMemory());
>>>>>>>               LOG.warn("big bag size: " + bag.size() + ", 
>>>>>>> hashcode: " +
>>>>>>> bag.hashCode());
>>>>>>>               return result;
>>>>>>>       }
>>>>>>> 
>>>>>>> 
>>>>>>> I think we have to redesign the data bag implementation with less 
>>>>>>> memory
>>>>>>> consumed.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Haitao Yao
>>>>>>> [email protected] <mailto:[email protected]>
>>>>>>> weibo: @haitao_yao
>>>>>>> Skype:  haitao.yao.final
>>>>>>> 
>>>>>>> 在 2012-7-10,上午6:54, Sheng Guo 写道:
>>>>>>> 
>>>>>>>> the pig script:
>>>>>>>> 
>>>>>>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
>>>>>>>> 
>>>>>>>> grpall = group longDesc all;
>>>>>>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber;
>>>>>>>> explain cnt;
>>>>>>>> 
>>>>>>>> 
>>>>>>>> the dump relation result:
>>>>>>>> 
>>>>>>>> #-----------------------------------------------
>>>>>>>> # New Logical Plan:
>>>>>>>> #-----------------------------------------------
>>>>>>>> cnt: (Name: LOStore Schema: allNumber#65:long)
>>>>>>>> |
>>>>>>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long)
>>>>>>>>  |   |
>>>>>>>>  |   (Name: LOGenerate[false] Schema:
>>>>>>>> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
>>>>>>>>  |   |   |
>>>>>>>>  |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long 
>>>>>>>> Uid:
>>>>>>>> 65)
>>>>>>>>  |   |   |
>>>>>>>>  |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 
>>>>>>>> Column:
>>>>>>>> (*))
>>>>>>>>  |   |
>>>>>>>>  |   |---longDesc: (Name: LOInnerLoad[1] Schema:
>>>>>>>> 
>>>>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
>>>>>>>>  |
>>>>>>>>  |---grpall: (Name: LOCogroup Schema:
>>>>>>>> 
>>>>>>> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
>>>>>>>>      |   |
>>>>>>>>      |   (Name: Constant Type: chararray Uid: 62)
>>>>>>>>      |
>>>>>>>>      |---longDesc: (Name: LOLoad Schema:
>>>>>>>> 
>>>>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
>>>>>>>> 
>>>>>>>> #-----------------------------------------------
>>>>>>>> # Physical Plan:
>>>>>>>> #-----------------------------------------------
>>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>>>>> |
>>>>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>>>>  |   |
>>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
>>>>>>>>  |   |
>>>>>>>>  |   |---Project[bag][1] - scope-5
>>>>>>>>  |
>>>>>>>>  |---grpall: Package[tuple]{chararray} - scope-2
>>>>>>>>      |
>>>>>>>>      |---grpall: Global Rearrange[tuple] - scope-1
>>>>>>>>          |
>>>>>>>>          |---grpall: Local Rearrange[tuple]{chararray}(false) - 
>>>>>>>> scope-3
>>>>>>>>              |   |
>>>>>>>>              |   Constant(all) - scope-4
>>>>>>>>              |
>>>>>>>>              |---longDesc:
>>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
>>>>>>>> 
>>>>>>>> 2012-07-09 15:47:02,441 [main] INFO
>>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler
>>>>>>>>  
>>>>>>>> -
>>>>>>>> File concatenation threshold: 100 optimistic? false
>>>>>>>> 2012-07-09 15:47:02,448 [main] INFO
>>>>>>>> 
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
>>>>>>>> - Choosing to move algebraic foreach to combiner
>>>>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>>>>> 
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>>>>> - MR plan size before optimization: 1
>>>>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>>>>> 
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>>>>> - MR plan size after optimization: 1
>>>>>>>> #--------------------------------------------------
>>>>>>>> # Map Reduce Plan
>>>>>>>> #--------------------------------------------------
>>>>>>>> MapReduce node scope-10
>>>>>>>> Map Plan
>>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
>>>>>>>> |   |
>>>>>>>> |   Project[chararray][0] - scope-23
>>>>>>>> |
>>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-11
>>>>>>>>  |   |
>>>>>>>>  |   Project[chararray][0] - scope-12
>>>>>>>>  |   |
>>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - 
>>>>>>>> scope-13
>>>>>>>>  |   |
>>>>>>>>  |   |---Project[bag][1] - scope-14
>>>>>>>>  |
>>>>>>>>  |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
>>>>>>>>      |
>>>>>>>>      |---longDesc:
>>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - 
>>>>>>>> scope-0--------
>>>>>>>> Combine Plan
>>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
>>>>>>>> |   |
>>>>>>>> |   Project[chararray][0] - scope-27
>>>>>>>> |
>>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-15
>>>>>>>>  |   |
>>>>>>>>  |   Project[chararray][0] - scope-16
>>>>>>>>  |   |
>>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] -
>>>>>>>> scope-17
>>>>>>>>  |   |
>>>>>>>>  |   |---Project[bag][1] - scope-18
>>>>>>>>  |
>>>>>>>>  |---POCombinerPackage[tuple]{chararray} - scope-20--------
>>>>>>>> Reduce Plan
>>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>>>>> |
>>>>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>>>>  |   |
>>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
>>>>>>>>  |   |
>>>>>>>>  |   |---Project[bag][1] - scope-19
>>>>>>>>  |
>>>>>>>>  |---POCombinerPackage[tuple]{chararray} - scope-28--------
>>>>>>>> Global sort: false
>>>>>>>> ----------------
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney 
>>>>>>>> <[email protected] <mailto:[email protected]>>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> instead of doing "dump relation," do "explain relation" (then run
>>>>>>>>> identically) and paste the output here. It will show whether the
>>>>>>> combiner
>>>>>>>>> is being used,
>>>>>>>>> 
>>>>>>>>> 2012/7/3 Ruslan Al-Fakikh <[email protected] 
>>>>>>>>> <mailto:[email protected]>>
>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> As it was said, COUNT is algebraic and should be fast, because it
>>>>>>>>>> forces combiner. You should make sure that combiner is really used
>>>>>>>>>> here. It can be disabled in some situations. I've encountered such
>>>>>>>>>> situations many times when a job is tooo heavy in case no 
>>>>>>>>>> combiner is
>>>>>>>>>> applied.
>>>>>>>>>> 
>>>>>>>>>> Ruslan
>>>>>>>>>> 
>>>>>>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S 
>>>>>>>>>> <[email protected] <mailto:[email protected]>>
>>>>>>>>> wrote:
>>>>>>>>>>> Right!!
>>>>>>>>>>> 
>>>>>>>>>>> Since it is mentioned that job is hanging, wild guess is it must be
>>>>>>>>>>> 'group all'. How can that be confirmed?
>>>>>>>>>>> 
>>>>>>>>>>> On 7/3/12, Jonathan Coveney <[email protected] 
>>>>>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>>>>>> group all uses a single reducer, but COUNT is algebraic, and 
>>>>>>>>>>>> as such,
>>>>>>>>>> will
>>>>>>>>>>>> use combiners, so it is generally quite fast.
>>>>>>>>>>>> 
>>>>>>>>>>>> 2012/7/2 Subir S <[email protected] 
>>>>>>>>>>>> <mailto:[email protected]>>
>>>>>>>>>>>> 
>>>>>>>>>>>>> Group all - uses single reducer AFAIU. You can try to count per
>>>>>>> group
>>>>>>>>>>>>> and sum may be.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> You may also try with COUNT_STAR to include NULL fields.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On 7/3/12, Sheng Guo <[email protected] 
>>>>>>>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I used to use the following pig script to do the counting of the
>>>>>>>>>>>>>> records.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> m_skill_group = group m_skills_filter by member_id;
>>>>>>>>>>>>>> grpd = group m_skill_group all;
>>>>>>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group);
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> cnt_filter = limit cnt 10;
>>>>>>>>>>>>>> dump cnt_filter;
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> but sometimes, when the records get larger, it takes lots of 
>>>>>>>>>>>>>> time
>>>>>>>>> and
>>>>>>>>>>>>> hang
>>>>>>>>>>>>>> up, and or die.
>>>>>>>>>>>>>> I thought counting should be simple enough, so what is the 
>>>>>>>>>>>>>> best way
>>>>>>>>>> to
>>>>>>>>>>>>> do a
>>>>>>>>>>>>>> counting in pig?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Sheng
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> Best Regards,
>>>>>>>>>> Ruslan Al-Fakikh
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
> 

Reply via email to