Haitao,
Is your query using combiner ? Can you send the explain plan output ?
Does the heap information say how many entries are there in the
InteralCachedBag's ArrayList ?
What version of pig are you using ?


Thanks,
Thejas


On 7/10/12 11:50 PM, Haitao Yao wrote:
> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because 
> every time the InternalCachedBag spills, It creates a new tmp file in 
> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple 
> added into InternalCachedBag will create a new tmp file. And the tmp 
> file is deleted on exit.
> So , if you're unlucky like me, you will get a OOM Exception caused by 
> java.io.DeleteOnExitHook!
> Here's the evidence:
> 
> God, we really need a full description of how every parameter works.
> 
> 
> 
> Haitao Yao
> [email protected] <mailto:[email protected]>
> weibo: @haitao_yao
> Skype:  haitao.yao.final
> 
> 在 2012-7-10,下午4:20, Haitao Yao 写道:
> 
>> I found the solution.
>>
>> After analyzing the heap dump while the reducer OOM, I found out the 
>> memory is consumed by org.apache.pig.data.InternalCachedBag , here's 
>> the diagram:
>> <cc.jpg>
>>
>> In the source code of org.apache.pig.data.InternalCachedBag, I found 
>> out there's a parameter for the cache limit:
>> *public* InternalCachedBag(*int* bagCount) {
>> *float* percent = 0.2F;
>>
>> *if* (PigMapReduce./sJobConfInternal/.get() != *null*) {
>> // here, the cache limit is from here!
>> String usage = 
>> PigMapReduce./sJobConfInternal/.get().get("pig.cachedbag.memusage");
>> *if* (usage != *null*) {
>> percent = Float./parseFloat/(usage);
>> }
>> }
>>
>>         init(bagCount, percent);
>>     }
>> *private* *void* init(*int* bagCount, *float* percent) {
>> factory = TupleFactory./getInstance/();
>> mContents = *new* ArrayList<Tuple>();
>>
>> *long* max = Runtime./getRuntime/().maxMemory();
>> maxMemUsage = (*long*)(((*float*)max * percent) / (*float*)bagCount);
>> cacheLimit = Integer./MAX_VALUE/;
>>
>> // set limit to 0, if memusage is 0 or really really small.
>> // then all tuples are put into disk
>> *if* (maxMemUsage < 1) {
>> cacheLimit = 0;
>>         }
>> /log/.warn("cacheLimit: " + *this*.cacheLimit);
>> addDone = *false*;
>>     }
>>
>> so, after write pig.cachedbag.memusage=0 into 
>> $PIG_HOME/conf/pig.properties, my job successes!
>>
>> You can also set to an appropriate value to fully utilize your memory 
>> as a cache.
>>
>> Hope this is useful for others.
>> Thanks.
>>
>>
>> Haitao Yao
>> [email protected] <mailto:[email protected]>
>> weibo: @haitao_yao
>> Skype:  haitao.yao.final
>>
>> 在 2012-7-10,下午1:06, Haitao Yao 写道:
>>
>>> my reducers get 512 MB, -Xms512M -Xmx512M.
>>> The reducer does not get OOM when manually invoke spill in my case.
>>>
>>> Can you explain more about your solution?
>>> And can your solution fit into 512MB reducer process?
>>> Thanks very much.
>>>
>>>
>>>
>>> Haitao Yao
>>> [email protected] <mailto:[email protected]>
>>> weibo: @haitao_yao
>>> Skype:  haitao.yao.final
>>>
>>> 在 2012-7-10,下午12:26, Jonathan Coveney 写道:
>>>
>>>> I have something in the mix that should reduce bag memory :) 
>>>> Question: how
>>>> much memory are your reducers getting? In my experience, you'll get 
>>>> OOM's
>>>> on spilling if you have allocated less than a gig to the JVM
>>>>
>>>> 2012/7/9 Haitao Yao <[email protected] <mailto:[email protected]>>
>>>>
>>>>> I have encountered the similar problem.  And I got a OOM while 
>>>>> running the
>>>>> reducer.
>>>>> I think the reason is the data bag generated after group all is too 
>>>>> big to
>>>>> fit into the reducer's memory.
>>>>>
>>>>> and I have written a new COUNT implementation with explicit invoke
>>>>> System.gc() and spill  after the COUNT function finish its job, but it
>>>>> still get OOM
>>>>>
>>>>> here's the code of the new COUNT implementation:
>>>>>        @Override
>>>>>        public Long exec(Tuple input) throws IOException {
>>>>>                DataBag bag = (DataBag)input.get(0);
>>>>>                Long result = super.exec(input);
>>>>>                LOG.warn(" before spill data bag memory : " +
>>>>> Runtime.getRuntime().freeMemory());
>>>>>                bag.spill();
>>>>>                System.gc();
>>>>>                LOG.warn(" after spill data bag memory : " +
>>>>> Runtime.getRuntime().freeMemory());
>>>>>                LOG.warn("big bag size: " + bag.size() + ", 
>>>>> hashcode: " +
>>>>> bag.hashCode());
>>>>>                return result;
>>>>>        }
>>>>>
>>>>>
>>>>> I think we have to redesign the data bag implementation with less 
>>>>> memory
>>>>> consumed.
>>>>>
>>>>>
>>>>>
>>>>> Haitao Yao
>>>>> [email protected] <mailto:[email protected]>
>>>>> weibo: @haitao_yao
>>>>> Skype:  haitao.yao.final
>>>>>
>>>>> 在 2012-7-10,上午6:54, Sheng Guo 写道:
>>>>>
>>>>>> the pig script:
>>>>>>
>>>>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
>>>>>>
>>>>>> grpall = group longDesc all;
>>>>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber;
>>>>>> explain cnt;
>>>>>>
>>>>>>
>>>>>> the dump relation result:
>>>>>>
>>>>>> #-----------------------------------------------
>>>>>> # New Logical Plan:
>>>>>> #-----------------------------------------------
>>>>>> cnt: (Name: LOStore Schema: allNumber#65:long)
>>>>>> |
>>>>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long)
>>>>>>   |   |
>>>>>>   |   (Name: LOGenerate[false] Schema:
>>>>>> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
>>>>>>   |   |   |
>>>>>>   |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long 
>>>>>> Uid:
>>>>>> 65)
>>>>>>   |   |   |
>>>>>>   |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 
>>>>>> Column:
>>>>>> (*))
>>>>>>   |   |
>>>>>>   |   |---longDesc: (Name: LOInnerLoad[1] Schema:
>>>>>>
>>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
>>>>>>   |
>>>>>>   |---grpall: (Name: LOCogroup Schema:
>>>>>>
>>>>> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
>>>>>>       |   |
>>>>>>       |   (Name: Constant Type: chararray Uid: 62)
>>>>>>       |
>>>>>>       |---longDesc: (Name: LOLoad Schema:
>>>>>>
>>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
>>>>>>
>>>>>> #-----------------------------------------------
>>>>>> # Physical Plan:
>>>>>> #-----------------------------------------------
>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>>> |
>>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>>   |   |
>>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
>>>>>>   |   |
>>>>>>   |   |---Project[bag][1] - scope-5
>>>>>>   |
>>>>>>   |---grpall: Package[tuple]{chararray} - scope-2
>>>>>>       |
>>>>>>       |---grpall: Global Rearrange[tuple] - scope-1
>>>>>>           |
>>>>>>           |---grpall: Local Rearrange[tuple]{chararray}(false) - 
>>>>>> scope-3
>>>>>>               |   |
>>>>>>               |   Constant(all) - scope-4
>>>>>>               |
>>>>>>               |---longDesc:
>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
>>>>>>
>>>>>> 2012-07-09 15:47:02,441 [main] INFO
>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler 
>>>>>> -
>>>>>> File concatenation threshold: 100 optimistic? false
>>>>>> 2012-07-09 15:47:02,448 [main] INFO
>>>>>>
>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
>>>>>> - Choosing to move algebraic foreach to combiner
>>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>>>
>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>>> - MR plan size before optimization: 1
>>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>>>
>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>>> - MR plan size after optimization: 1
>>>>>> #--------------------------------------------------
>>>>>> # Map Reduce Plan
>>>>>> #--------------------------------------------------
>>>>>> MapReduce node scope-10
>>>>>> Map Plan
>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
>>>>>> |   |
>>>>>> |   Project[chararray][0] - scope-23
>>>>>> |
>>>>>> |---cnt: New For Each(false,false)[bag] - scope-11
>>>>>>   |   |
>>>>>>   |   Project[chararray][0] - scope-12
>>>>>>   |   |
>>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - 
>>>>>> scope-13
>>>>>>   |   |
>>>>>>   |   |---Project[bag][1] - scope-14
>>>>>>   |
>>>>>>   |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
>>>>>>       |
>>>>>>       |---longDesc:
>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - 
>>>>>> scope-0--------
>>>>>> Combine Plan
>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
>>>>>> |   |
>>>>>> |   Project[chararray][0] - scope-27
>>>>>> |
>>>>>> |---cnt: New For Each(false,false)[bag] - scope-15
>>>>>>   |   |
>>>>>>   |   Project[chararray][0] - scope-16
>>>>>>   |   |
>>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] -
>>>>>> scope-17
>>>>>>   |   |
>>>>>>   |   |---Project[bag][1] - scope-18
>>>>>>   |
>>>>>>   |---POCombinerPackage[tuple]{chararray} - scope-20--------
>>>>>> Reduce Plan
>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>>> |
>>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>>   |   |
>>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
>>>>>>   |   |
>>>>>>   |   |---Project[bag][1] - scope-19
>>>>>>   |
>>>>>>   |---POCombinerPackage[tuple]{chararray} - scope-28--------
>>>>>> Global sort: false
>>>>>> ----------------
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney 
>>>>>> <[email protected] <mailto:[email protected]>>
>>>>> wrote:
>>>>>>
>>>>>>> instead of doing "dump relation," do "explain relation" (then run
>>>>>>> identically) and paste the output here. It will show whether the
>>>>> combiner
>>>>>>> is being used,
>>>>>>>
>>>>>>> 2012/7/3 Ruslan Al-Fakikh <[email protected] 
>>>>>>> <mailto:[email protected]>>
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> As it was said, COUNT is algebraic and should be fast, because it
>>>>>>>> forces combiner. You should make sure that combiner is really used
>>>>>>>> here. It can be disabled in some situations. I've encountered such
>>>>>>>> situations many times when a job is tooo heavy in case no 
>>>>>>>> combiner is
>>>>>>>> applied.
>>>>>>>>
>>>>>>>> Ruslan
>>>>>>>>
>>>>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S 
>>>>>>>> <[email protected] <mailto:[email protected]>>
>>>>>>> wrote:
>>>>>>>>> Right!!
>>>>>>>>>
>>>>>>>>> Since it is mentioned that job is hanging, wild guess is it must be
>>>>>>>>> 'group all'. How can that be confirmed?
>>>>>>>>>
>>>>>>>>> On 7/3/12, Jonathan Coveney <[email protected] 
>>>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>>>> group all uses a single reducer, but COUNT is algebraic, and 
>>>>>>>>>> as such,
>>>>>>>> will
>>>>>>>>>> use combiners, so it is generally quite fast.
>>>>>>>>>>
>>>>>>>>>> 2012/7/2 Subir S <[email protected] 
>>>>>>>>>> <mailto:[email protected]>>
>>>>>>>>>>
>>>>>>>>>>> Group all - uses single reducer AFAIU. You can try to count per
>>>>> group
>>>>>>>>>>> and sum may be.
>>>>>>>>>>>
>>>>>>>>>>> You may also try with COUNT_STAR to include NULL fields.
>>>>>>>>>>>
>>>>>>>>>>> On 7/3/12, Sheng Guo <[email protected] 
>>>>>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> I used to use the following pig script to do the counting of the
>>>>>>>>>>>> records.
>>>>>>>>>>>>
>>>>>>>>>>>> m_skill_group = group m_skills_filter by member_id;
>>>>>>>>>>>> grpd = group m_skill_group all;
>>>>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group);
>>>>>>>>>>>>
>>>>>>>>>>>> cnt_filter = limit cnt 10;
>>>>>>>>>>>> dump cnt_filter;
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> but sometimes, when the records get larger, it takes lots of 
>>>>>>>>>>>> time
>>>>>>> and
>>>>>>>>>>> hang
>>>>>>>>>>>> up, and or die.
>>>>>>>>>>>> I thought counting should be simple enough, so what is the 
>>>>>>>>>>>> best way
>>>>>>>> to
>>>>>>>>>>> do a
>>>>>>>>>>>> counting in pig?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> Sheng
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards,
>>>>>>>> Ruslan Al-Fakikh
>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>
>>
> 


Reply via email to