Yes Ashutosh, that is the case and here the code for the UDF. Let me know
what you find.

public class GroupSum extends EvalFunc<DataBag> {
    TupleFactory mTupleFactory;
    BagFactory mBagFactory;

    public GroupSum() {
        this.mTupleFactory = TupleFactory.getInstance();
        this.mBagFactory = BagFactory.getInstance();
    }

    public DataBag exec(Tuple input) throws IOException {
        if (input.size() < 0) {
            int errCode = 2107;
            String msg = "GroupSum expects one input but received "
                    + input.size()
                    + " inputs. \n";
            throw new ExecException(msg, errCode);
        }
        try {
            DataBag output = this.mBagFactory.newDefaultBag();
            Object o1 = input.get(0);
            if (o1 instanceof DataBag) {
                DataBag bag1 = (DataBag) o1;
                if (bag1.size() == 1L) {
                    return bag1;
                }
                sumBag(bag1, output);
            }
            return output;
        } catch (ExecException ee) {
            throw ee;
        }
    }

    private void sumBag(DataBag o1, DataBag emitTo) throws IOException {
        Iterator<?> i1 = o1.iterator();
        Tuple row = null;
        Tuple firstRow = null;;
        
        int fld1 = 0, fld2 = 0, fld3 = 0, fld4 = 0, fld5 = 0;
        int cnt = 0;
        while (i1.hasNext()) {
            row = (Tuple) i1.next();
            if (cnt == 0) {
                firstRow = row;
            }      
            fld1 += (Integer) row.get(1);
            fld2 += (Integer) row.get(2);
            fld3 += (Integer) row.get(3);
            fld4 += (Integer) row.get(4);
            fld5 += (Integer) row.get(5);
            cnt ++;
        }
        //field 0 has the id in it.
        firstRow.set(1, fld1);
        firstRow.set(2, fld2);
        firstRow.set(3, fld3);
        firstRow.set(4, fld4);
        firstRow.set(5, fld5);
        emitTo.add(firstRow);
    }

    public Schema outputSchema(Schema input) {
        try {
            Schema tupleSchema = new Schema();
            tupleSchema.add(input.getField(0));
            tupleSchema.setTwoLevelAccessRequired(true);
            return tupleSchema;
        } catch (Exception e) {
        }
        return null;
    }
}


On 7/9/10 2:32 PM, "Ashutosh Chauhan" <ashutosh.chau...@gmail.com> wrote:

> Hi Syed,
> 
> Do you mean your query fails with OOME if you use Pig's builtin SUM,
> but succeeds if you use your own SUM UDF? If that is so, thats
> interesting.  I have a hunch, why that is the case, but would like to
> confirm. Would you mind sharing your SUM UDF.
> 
> Ashutosh
> On Fri, Jul 9, 2010 at 12:50, Syed Wasti <mdwa...@hotmail.com> wrote:
>> Hi Ashutosh,
>> Did not try option 2 and 3, I shall work sometime next week on that.
>> But increasing the heap size did not help initially, with the increased heap
>> size I came up with a UDF to do the SUM on the grouped data for the last
>> step in my script and it completes my query without any errors now.
>> 
>> Syed
>> 
>> 
>> On 7/8/10 5:58 PM, "Ashutosh Chauhan" <ashutosh.chau...@gmail.com> wrote:
>> 
>>> Aah.. forgot to tell how to set that param  in 3). While launching
>>> pig, provide it as -D cmd line switch, as follows:
>>> pig -Dpig.cachedbag.memusage=0.02f myscript.pig
>>> 
>>> On Thu, Jul 8, 2010 at 17:45, Ashutosh Chauhan
>>> <ashutosh.chau...@gmail.com> wrote:
>>>> I will recommend following things in the order:
>>>> 
>>>> 1) Increasing heap size should help.
>>>> 2) It seems you are on 0.7. There are couple of memory fixes we have
>>>> committed both on 0.7 branch as well as on trunk. Those should help as
>>>> well. So, build Pig either from trunk or 0.7 branch and use that.
>>>> 3) Only if these dont help, you can try tuning the param
>>>> pig.cachedbag.memusage. By default, it is set at 0.1, lowering it
>>>> should help. Try with 0.05, 0.02 and then further down. Downside is,
>>>> as you go lower and lower, it will make your query go slower.
>>>> 
>>>> Let us know if these changes get your query to completion.
>>>> 
>>>> Ashutosh
>>>> 
>>>> On Thu, Jul 8, 2010 at 15:48, Syed Wasti <mdwa...@hotmail.com> wrote:
>>>>> Thanks Ashutosh, is there any workaround for this, will increasing the
>>>>> heap
>>>>> size help ?
>>>>> 
>>>>> 
>>>>> On 7/8/10 1:59 PM, "Ashutosh Chauhan" <ashutosh.chau...@gmail.com> wrote:
>>>>> 
>>>>>> Syed,
>>>>>> 
>>>>>> You are likely hit by https://issues.apache.org/jira/browse/PIG-1442 .
>>>>>> Your query and stacktrace look very similar to the one in the jira
>>>>>> ticket. This may get fixed by 0.8 release.
>>>>>> 
>>>>>> Ashutosh
>>>>>> 
>>>>>> On Thu, Jul 8, 2010 at 13:42, Syed Wasti <mdwa...@hotmail.com> wrote:
>>>>>>> Sorry about the delay, was held with different things.
>>>>>>> Here is the script and the errors below;
>>>>>>> 
>>>>>>> AA = LOAD 'table1' USING PigStorage('\t') as
>>>>>>> (ID,b,c,d,e,f,g,h,i,j,k,l,m,n,o);
>>>>>>> 
>>>>>>> AB = FOREACH AA GENERATE ID, e, f, n,o;
>>>>>>> 
>>>>>>> AC = FILTER AB BY o == 1;
>>>>>>> 
>>>>>>> AD = GROUP AC BY (ID, b);
>>>>>>> 
>>>>>>> AE = FOREACH AD { A = DISTINCT AC.d;
>>>>>>>        GENERATE group.ID, (chararray) 'S' AS type, group.b, (int)
>>>>>>> COUNT_STAR(filt) AS cnt, (int) COUNT(A) AS cnt_distinct; }
>>>>>>> 
>>>>>>> The same steps are repeated to load 5 different tables and then a UNION
>>>>>>> is
>>>>>>> done on them.
>>>>>>> 
>>>>>>> Final_res = UNION AE, AF, AG, AH, AI;
>>>>>>> 
>>>>>>> The actual number of columns will be 15 here I am showing with one
>>>>>>> table.
>>>>>>> 
>>>>>>> Final_table =   FOREACH Final_res GENERATE ID,
>>>>>>>                (type == 'S' AND b == 1?cnt:0) AS 12_tmp,
>>>>>>>                (type == 'S' AND b == 2?cnt:0) AS 13_tmp,
>>>>>>>                (type == 'S' AND b == 1?cnt_distinct:0) AS
>>>>>>> 12_distinct_tmp,
>>>>>>>                (type == 'S' AND b == 2?cnt_distinct:0) AS
>>>>>>> 13_distinct_tmp;
>>>>>>> 
>>>>>>> It works fine until here, it is only after adding this last part of the
>>>>>>> query it starts throwing heap errors.
>>>>>>> 
>>>>>>> grp_id =    GROUP Final_table BY ID;
>>>>>>> 
>>>>>>> Final_data = FOREACH grp_reg_id GENERATE group AS ID
>>>>>>> SUM(Final_table.12_tmp), SUM(Final_table.13_tmp),
>>>>>>> SUM(Final_table.12_distinct_tmp), SUM(Final_table.13_distinct_tmp);
>>>>>>> 
>>>>>>> STORE Final_data;
>>>>>>> 
>>>>>>> 
>>>>>>> Error: java.lang.OutOfMemoryError: Java heap space
>>>>>>>  at java.util.ArrayList.(ArrayList.java:112)
>>>>>>>  at org.apache.pig.data.DefaultTuple.(DefaultTuple.java:63)
>>>>>>>  at
>>>>>>> org.apache.pig.data.DefaultTupleFactory.newTuple(DefaultTupleFactory.jav
>>>>>>> a:
>>>>>>> 35
>>>>>>> )
>>>>>>>  at
>>>>>>> 
>> org.apache.pig.data.DataReaderWriter.bytesToTuple(DataReaderWriter.java:55>>>
>> >>
>> )
>>>>>>>  at
>>>>>>> 
org.apache.pig.data.DataReaderWriter.readDatum(DataReaderWriter.java:136>>>>>>>
)
>>>>>>>  at
>>>>>>> 
org.apache.pig.data.DataReaderWriter.readDatum(DataReaderWriter.java:130>>>>>>>
)
>>>>>>>  at org.apache.pig.data.DefaultTuple.readFields(DefaultTuple.java:289)
>>>>>>>  at
>>>>>>> org.apache.pig.impl.io.PigNullableWritable.readFields(PigNullableWritabl
>>>>>>> e.
>>>>>>> ja
>>>>>>> va:114)
>>>>>>>  at
>>>>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ
>>>>>>> er
>>>>>>> .d
>>>>>>> eserialize(WritableSerialization.java:67)
>>>>>>>  at
>>>>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ
>>>>>>> er
>>>>>>> .d
>>>>>>> eserialize(WritableSerialization.java:40)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.jav
>>>>>>> a:
>>>>>>> 11
>>>>>>> 6)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapreduce.ReduceContext.nextKey(ReduceContext.java:92)
>>>>>>>  at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:175)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1217)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.ja
>>>>>>> va
>>>>>>> :1
>>>>>>> 227)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.jav
>>>>>>> a:
>>>>>>> 64
>>>>>>> 8)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask
>>>>>>> .j
>>>>>>> av
>>>>>>> a:1135)
>>>>>>> 
>>>>>>> 
>>>>>>> Error: java.lang.OutOfMemoryError: Java heap space
>>>>>>>  at
>>>>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp
>>>>>>> er
>>>>>>> at
>>>>>>> ors.POCombinerPackage.createDataBag(POCombinerPackage.java:139)
>>>>>>>  at
>>>>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp
>>>>>>> er
>>>>>>> at
>>>>>>> ors.POCombinerPackage.getNext(POCombinerPackage.java:148)
>>>>>>>  at
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner
>>>>>>> $C
>>>>>>> om
>>>>>>> bine.processOnePackageOutput(PigCombiner.java:168)
>>>>>>>  at
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner
>>>>>>> $C
>>>>>>> om
>>>>>>> bine.reduce(PigCombiner.java:159)
>>>>>>>  at
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner
>>>>>>> $C
>>>>>>> om
>>>>>>> bine.reduce(PigCombiner.java:50)
>>>>>>>  at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1217)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.ja
>>>>>>> va
>>>>>>> :1
>>>>>>> 227)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.jav
>>>>>>> a:
>>>>>>> 64
>>>>>>> 8)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask
>>>>>>> .j
>>>>>>> av
>>>>>>> a:1135)
>>>>>>> 
>>>>>>> 
>>>>>>> Error: java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>  at java.util.AbstractList.iterator(AbstractList.java:273)
>>>>>>>  at 
>>>>>>> org.apache.pig.data.DefaultTuple.getMemorySize(DefaultTuple.java:185)
>>>>>>>  at org.apache.pig.data.InternalCachedBag.add(InternalCachedBag.java:89)
>>>>>>>  at
>>>>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp
>>>>>>> er
>>>>>>> at
>>>>>>> ors.POCombinerPackage.getNext(POCombinerPackage.java:168)
>>>>>>>  at
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner
>>>>>>> $C
>>>>>>> om
>>>>>>> bine.processOnePackageOutput(PigCombiner.java:168)
>>>>>>>  at
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner
>>>>>>> $C
>>>>>>> om
>>>>>>> bine.reduce(PigCombiner.java:159)
>>>>>>>  at
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner
>>>>>>> $C
>>>>>>> om
>>>>>>> bine.reduce(PigCombiner.java:50)
>>>>>>>  at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1217)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.ja
>>>>>>> va
>>>>>>> :1
>>>>>>> 227)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.jav
>>>>>>> a:
>>>>>>> 64
>>>>>>> 8)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask
>>>>>>> .j
>>>>>>> av
>>>>>>> a:1135)
>>>>>>> 
>>>>>>> 
>>>>>>> Error: GC overhead limit exceeded
>>>>>>> -------
>>>>>>> Error: java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>  at
>>>>>>> org.apache.pig.data.DefaultTupleFactory.newTuple(DefaultTupleFactory.jav
>>>>>>> a:
>>>>>>> 35
>>>>>>> )
>>>>>>>  at
>>>>>>> 
>> org.apache.pig.data.DataReaderWriter.bytesToTuple(DataReaderWriter.java:55>>>
>> >>
>> )
>>>>>>>  at
>>>>>>> 
org.apache.pig.data.DataReaderWriter.readDatum(DataReaderWriter.java:136>>>>>>>
)
>>>>>>>  at
>>>>>>> 
org.apache.pig.data.DataReaderWriter.readDatum(DataReaderWriter.java:130>>>>>>>
)
>>>>>>>  at org.apache.pig.data.DefaultTuple.readFields(DefaultTuple.java:289)
>>>>>>>  at
>>>>>>> org.apache.pig.impl.io.PigNullableWritable.readFields(PigNullableWritabl
>>>>>>> e.
>>>>>>> ja
>>>>>>> va:114)
>>>>>>>  at
>>>>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ
>>>>>>> er
>>>>>>> .d
>>>>>>> eserialize(WritableSerialization.java:67)
>>>>>>>  at
>>>>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ
>>>>>>> er
>>>>>>> .d
>>>>>>> eserialize(WritableSerialization.java:40)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.jav
>>>>>>> a:
>>>>>>> 11
>>>>>>> 6)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapreduce.ReduceContext.nextKey(ReduceContext.java:92)
>>>>>>>  at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:175)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1217)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.ja
>>>>>>> va
>>>>>>> :1
>>>>>>> 227)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.jav
>>>>>>> a:
>>>>>>> 64
>>>>>>> 8)
>>>>>>>  at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask
>>>>>>> .j
>>>>>>> av
>>>>>>> a:1135)
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On 7/7/10 5:50 PM, "Ashutosh Chauhan" <ashutosh.chau...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Syed,
>>>>>>>> 
>>>>>>>> One line stack traces arent much helpful :) Please provide the full
>>>>>>>> stack
>>>>>>>> trace and the pig script which produced it and we can take a look.
>>>>>>>> 
>>>>>>>> Ashutosh
>>>>>>>> On Wed, Jul 7, 2010 at 14:09, Syed Wasti <mdwa...@hotmail.com> wrote:
>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I am running my Pig scripts on our QA cluster (with 4 datanoes, see
>>>>>>>>> blelow)
>>>>>>>>> and has Cloudera CDH2 release installed and global heap max is
>>>>>>>>> ­Xmx4096m.I
>>>>>>>>> am
>>>>>>>>> constantly getting OutOfMemory errors (see below) on my map and reduce
>>>>>>>>> jobs, when I try run my script against large data where it produces
>>>>>>>>> around
>>>>>>>>> 600 maps.
>>>>>>>>> Looking for some tips on the best configuration for pig and to get rid
>>>>>>>>> of
>>>>>>>>> these errors. Thanks.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Error: GC overhead limit exceededError: java.lang.OutOfMemoryError:
>>>>>>>>> Java
>>>>>>>>> heap space
>>>>>>>>> 
>>>>>>>>> Regards
>>>>>>>>> Syed
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
>> 
> 


Reply via email to