Yes Ashutosh, that is the case and here the code for the UDF. Let me know what you find.
public class GroupSum extends EvalFunc<DataBag> { TupleFactory mTupleFactory; BagFactory mBagFactory; public GroupSum() { this.mTupleFactory = TupleFactory.getInstance(); this.mBagFactory = BagFactory.getInstance(); } public DataBag exec(Tuple input) throws IOException { if (input.size() < 0) { int errCode = 2107; String msg = "GroupSum expects one input but received " + input.size() + " inputs. \n"; throw new ExecException(msg, errCode); } try { DataBag output = this.mBagFactory.newDefaultBag(); Object o1 = input.get(0); if (o1 instanceof DataBag) { DataBag bag1 = (DataBag) o1; if (bag1.size() == 1L) { return bag1; } sumBag(bag1, output); } return output; } catch (ExecException ee) { throw ee; } } private void sumBag(DataBag o1, DataBag emitTo) throws IOException { Iterator<?> i1 = o1.iterator(); Tuple row = null; Tuple firstRow = null;; int fld1 = 0, fld2 = 0, fld3 = 0, fld4 = 0, fld5 = 0; int cnt = 0; while (i1.hasNext()) { row = (Tuple) i1.next(); if (cnt == 0) { firstRow = row; } fld1 += (Integer) row.get(1); fld2 += (Integer) row.get(2); fld3 += (Integer) row.get(3); fld4 += (Integer) row.get(4); fld5 += (Integer) row.get(5); cnt ++; } //field 0 has the id in it. firstRow.set(1, fld1); firstRow.set(2, fld2); firstRow.set(3, fld3); firstRow.set(4, fld4); firstRow.set(5, fld5); emitTo.add(firstRow); } public Schema outputSchema(Schema input) { try { Schema tupleSchema = new Schema(); tupleSchema.add(input.getField(0)); tupleSchema.setTwoLevelAccessRequired(true); return tupleSchema; } catch (Exception e) { } return null; } } On 7/9/10 2:32 PM, "Ashutosh Chauhan" <ashutosh.chau...@gmail.com> wrote: > Hi Syed, > > Do you mean your query fails with OOME if you use Pig's builtin SUM, > but succeeds if you use your own SUM UDF? If that is so, thats > interesting. I have a hunch, why that is the case, but would like to > confirm. Would you mind sharing your SUM UDF. > > Ashutosh > On Fri, Jul 9, 2010 at 12:50, Syed Wasti <mdwa...@hotmail.com> wrote: >> Hi Ashutosh, >> Did not try option 2 and 3, I shall work sometime next week on that. >> But increasing the heap size did not help initially, with the increased heap >> size I came up with a UDF to do the SUM on the grouped data for the last >> step in my script and it completes my query without any errors now. >> >> Syed >> >> >> On 7/8/10 5:58 PM, "Ashutosh Chauhan" <ashutosh.chau...@gmail.com> wrote: >> >>> Aah.. forgot to tell how to set that param in 3). While launching >>> pig, provide it as -D cmd line switch, as follows: >>> pig -Dpig.cachedbag.memusage=0.02f myscript.pig >>> >>> On Thu, Jul 8, 2010 at 17:45, Ashutosh Chauhan >>> <ashutosh.chau...@gmail.com> wrote: >>>> I will recommend following things in the order: >>>> >>>> 1) Increasing heap size should help. >>>> 2) It seems you are on 0.7. There are couple of memory fixes we have >>>> committed both on 0.7 branch as well as on trunk. Those should help as >>>> well. So, build Pig either from trunk or 0.7 branch and use that. >>>> 3) Only if these dont help, you can try tuning the param >>>> pig.cachedbag.memusage. By default, it is set at 0.1, lowering it >>>> should help. Try with 0.05, 0.02 and then further down. Downside is, >>>> as you go lower and lower, it will make your query go slower. >>>> >>>> Let us know if these changes get your query to completion. >>>> >>>> Ashutosh >>>> >>>> On Thu, Jul 8, 2010 at 15:48, Syed Wasti <mdwa...@hotmail.com> wrote: >>>>> Thanks Ashutosh, is there any workaround for this, will increasing the >>>>> heap >>>>> size help ? >>>>> >>>>> >>>>> On 7/8/10 1:59 PM, "Ashutosh Chauhan" <ashutosh.chau...@gmail.com> wrote: >>>>> >>>>>> Syed, >>>>>> >>>>>> You are likely hit by https://issues.apache.org/jira/browse/PIG-1442 . >>>>>> Your query and stacktrace look very similar to the one in the jira >>>>>> ticket. This may get fixed by 0.8 release. >>>>>> >>>>>> Ashutosh >>>>>> >>>>>> On Thu, Jul 8, 2010 at 13:42, Syed Wasti <mdwa...@hotmail.com> wrote: >>>>>>> Sorry about the delay, was held with different things. >>>>>>> Here is the script and the errors below; >>>>>>> >>>>>>> AA = LOAD 'table1' USING PigStorage('\t') as >>>>>>> (ID,b,c,d,e,f,g,h,i,j,k,l,m,n,o); >>>>>>> >>>>>>> AB = FOREACH AA GENERATE ID, e, f, n,o; >>>>>>> >>>>>>> AC = FILTER AB BY o == 1; >>>>>>> >>>>>>> AD = GROUP AC BY (ID, b); >>>>>>> >>>>>>> AE = FOREACH AD { A = DISTINCT AC.d; >>>>>>> GENERATE group.ID, (chararray) 'S' AS type, group.b, (int) >>>>>>> COUNT_STAR(filt) AS cnt, (int) COUNT(A) AS cnt_distinct; } >>>>>>> >>>>>>> The same steps are repeated to load 5 different tables and then a UNION >>>>>>> is >>>>>>> done on them. >>>>>>> >>>>>>> Final_res = UNION AE, AF, AG, AH, AI; >>>>>>> >>>>>>> The actual number of columns will be 15 here I am showing with one >>>>>>> table. >>>>>>> >>>>>>> Final_table = FOREACH Final_res GENERATE ID, >>>>>>> (type == 'S' AND b == 1?cnt:0) AS 12_tmp, >>>>>>> (type == 'S' AND b == 2?cnt:0) AS 13_tmp, >>>>>>> (type == 'S' AND b == 1?cnt_distinct:0) AS >>>>>>> 12_distinct_tmp, >>>>>>> (type == 'S' AND b == 2?cnt_distinct:0) AS >>>>>>> 13_distinct_tmp; >>>>>>> >>>>>>> It works fine until here, it is only after adding this last part of the >>>>>>> query it starts throwing heap errors. >>>>>>> >>>>>>> grp_id = GROUP Final_table BY ID; >>>>>>> >>>>>>> Final_data = FOREACH grp_reg_id GENERATE group AS ID >>>>>>> SUM(Final_table.12_tmp), SUM(Final_table.13_tmp), >>>>>>> SUM(Final_table.12_distinct_tmp), SUM(Final_table.13_distinct_tmp); >>>>>>> >>>>>>> STORE Final_data; >>>>>>> >>>>>>> >>>>>>> Error: java.lang.OutOfMemoryError: Java heap space >>>>>>> at java.util.ArrayList.(ArrayList.java:112) >>>>>>> at org.apache.pig.data.DefaultTuple.(DefaultTuple.java:63) >>>>>>> at >>>>>>> org.apache.pig.data.DefaultTupleFactory.newTuple(DefaultTupleFactory.jav >>>>>>> a: >>>>>>> 35 >>>>>>> ) >>>>>>> at >>>>>>> >> org.apache.pig.data.DataReaderWriter.bytesToTuple(DataReaderWriter.java:55>>> >> >> >> ) >>>>>>> at >>>>>>> org.apache.pig.data.DataReaderWriter.readDatum(DataReaderWriter.java:136>>>>>>> ) >>>>>>> at >>>>>>> org.apache.pig.data.DataReaderWriter.readDatum(DataReaderWriter.java:130>>>>>>> ) >>>>>>> at org.apache.pig.data.DefaultTuple.readFields(DefaultTuple.java:289) >>>>>>> at >>>>>>> org.apache.pig.impl.io.PigNullableWritable.readFields(PigNullableWritabl >>>>>>> e. >>>>>>> ja >>>>>>> va:114) >>>>>>> at >>>>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ >>>>>>> er >>>>>>> .d >>>>>>> eserialize(WritableSerialization.java:67) >>>>>>> at >>>>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ >>>>>>> er >>>>>>> .d >>>>>>> eserialize(WritableSerialization.java:40) >>>>>>> at >>>>>>> org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.jav >>>>>>> a: >>>>>>> 11 >>>>>>> 6) >>>>>>> at >>>>>>> org.apache.hadoop.mapreduce.ReduceContext.nextKey(ReduceContext.java:92) >>>>>>> at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:175) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1217) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.ja >>>>>>> va >>>>>>> :1 >>>>>>> 227) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.jav >>>>>>> a: >>>>>>> 64 >>>>>>> 8) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask >>>>>>> .j >>>>>>> av >>>>>>> a:1135) >>>>>>> >>>>>>> >>>>>>> Error: java.lang.OutOfMemoryError: Java heap space >>>>>>> at >>>>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp >>>>>>> er >>>>>>> at >>>>>>> ors.POCombinerPackage.createDataBag(POCombinerPackage.java:139) >>>>>>> at >>>>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp >>>>>>> er >>>>>>> at >>>>>>> ors.POCombinerPackage.getNext(POCombinerPackage.java:148) >>>>>>> at >>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner >>>>>>> $C >>>>>>> om >>>>>>> bine.processOnePackageOutput(PigCombiner.java:168) >>>>>>> at >>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner >>>>>>> $C >>>>>>> om >>>>>>> bine.reduce(PigCombiner.java:159) >>>>>>> at >>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner >>>>>>> $C >>>>>>> om >>>>>>> bine.reduce(PigCombiner.java:50) >>>>>>> at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1217) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.ja >>>>>>> va >>>>>>> :1 >>>>>>> 227) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.jav >>>>>>> a: >>>>>>> 64 >>>>>>> 8) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask >>>>>>> .j >>>>>>> av >>>>>>> a:1135) >>>>>>> >>>>>>> >>>>>>> Error: java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>> at java.util.AbstractList.iterator(AbstractList.java:273) >>>>>>> at >>>>>>> org.apache.pig.data.DefaultTuple.getMemorySize(DefaultTuple.java:185) >>>>>>> at org.apache.pig.data.InternalCachedBag.add(InternalCachedBag.java:89) >>>>>>> at >>>>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOp >>>>>>> er >>>>>>> at >>>>>>> ors.POCombinerPackage.getNext(POCombinerPackage.java:168) >>>>>>> at >>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner >>>>>>> $C >>>>>>> om >>>>>>> bine.processOnePackageOutput(PigCombiner.java:168) >>>>>>> at >>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner >>>>>>> $C >>>>>>> om >>>>>>> bine.reduce(PigCombiner.java:159) >>>>>>> at >>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner >>>>>>> $C >>>>>>> om >>>>>>> bine.reduce(PigCombiner.java:50) >>>>>>> at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1217) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.ja >>>>>>> va >>>>>>> :1 >>>>>>> 227) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.jav >>>>>>> a: >>>>>>> 64 >>>>>>> 8) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask >>>>>>> .j >>>>>>> av >>>>>>> a:1135) >>>>>>> >>>>>>> >>>>>>> Error: GC overhead limit exceeded >>>>>>> ------- >>>>>>> Error: java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>> at >>>>>>> org.apache.pig.data.DefaultTupleFactory.newTuple(DefaultTupleFactory.jav >>>>>>> a: >>>>>>> 35 >>>>>>> ) >>>>>>> at >>>>>>> >> org.apache.pig.data.DataReaderWriter.bytesToTuple(DataReaderWriter.java:55>>> >> >> >> ) >>>>>>> at >>>>>>> org.apache.pig.data.DataReaderWriter.readDatum(DataReaderWriter.java:136>>>>>>> ) >>>>>>> at >>>>>>> org.apache.pig.data.DataReaderWriter.readDatum(DataReaderWriter.java:130>>>>>>> ) >>>>>>> at org.apache.pig.data.DefaultTuple.readFields(DefaultTuple.java:289) >>>>>>> at >>>>>>> org.apache.pig.impl.io.PigNullableWritable.readFields(PigNullableWritabl >>>>>>> e. >>>>>>> ja >>>>>>> va:114) >>>>>>> at >>>>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ >>>>>>> er >>>>>>> .d >>>>>>> eserialize(WritableSerialization.java:67) >>>>>>> at >>>>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ >>>>>>> er >>>>>>> .d >>>>>>> eserialize(WritableSerialization.java:40) >>>>>>> at >>>>>>> org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.jav >>>>>>> a: >>>>>>> 11 >>>>>>> 6) >>>>>>> at >>>>>>> org.apache.hadoop.mapreduce.ReduceContext.nextKey(ReduceContext.java:92) >>>>>>> at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:175) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1217) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.ja >>>>>>> va >>>>>>> :1 >>>>>>> 227) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.jav >>>>>>> a: >>>>>>> 64 >>>>>>> 8) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask >>>>>>> .j >>>>>>> av >>>>>>> a:1135) >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 7/7/10 5:50 PM, "Ashutosh Chauhan" <ashutosh.chau...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Syed, >>>>>>>> >>>>>>>> One line stack traces arent much helpful :) Please provide the full >>>>>>>> stack >>>>>>>> trace and the pig script which produced it and we can take a look. >>>>>>>> >>>>>>>> Ashutosh >>>>>>>> On Wed, Jul 7, 2010 at 14:09, Syed Wasti <mdwa...@hotmail.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> I am running my Pig scripts on our QA cluster (with 4 datanoes, see >>>>>>>>> blelow) >>>>>>>>> and has Cloudera CDH2 release installed and global heap max is >>>>>>>>> Xmx4096m.I >>>>>>>>> am >>>>>>>>> constantly getting OutOfMemory errors (see below) on my map and reduce >>>>>>>>> jobs, when I try run my script against large data where it produces >>>>>>>>> around >>>>>>>>> 600 maps. >>>>>>>>> Looking for some tips on the best configuration for pig and to get rid >>>>>>>>> of >>>>>>>>> these errors. Thanks. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Error: GC overhead limit exceededError: java.lang.OutOfMemoryError: >>>>>>>>> Java >>>>>>>>> heap space >>>>>>>>> >>>>>>>>> Regards >>>>>>>>> Syed >>>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>> >> >> >> >