Hi Davide,
Your UDF is doing a lot of intensive processing without reporting its progress. 
EvalFunc class has a reporter field, please use that to report progress in your 
UDF (use reporter.progress() method) so that Hadoop doesn't kill your task.

Nezih

-----Original Message-----
From: Davide Brambilla [mailto:[email protected]] 
Sent: Sunday, January 26, 2014 6:27 AM
To: [email protected]
Subject: custom UDF generates SpillableMemoryManager and task is killed

Hi,
   I'm new to Pig and i wrote a  pig UDF to generate a bag of tuples, it seems 
to be correct, I've tested and it works perfectly.
What I've missed ?
Thanks

Davide B.

When I apply it to my data (220 million of rows) using this scripts

*1 REGISTER '/mnt5/pig/udf_date.jar';*
*2 REGISTER
'/opt/cloudera/parcels/CDH-4.5.0-1.cdh4.5.0.p0.30/lib/pig/piggybank.jar';*
*3 jlsraw = load './data/mydata.dat' USING PigStorage(' ') as (ts:chararray, 
user:chararray, d1:chararray, item:chararray,d2:chararray,duration:long, 
d3:chararray);*
*4 jlsfilter = FILTER jlsraw BY $5/1000 >= 120;*
*5 jlsfilters = ORDER jlsfilter by user;*
*6 jlsslots = FOREACH jlsfilters GENERATE $1 as user, $3 as item,
com.moviri.pig.udf.GenerateProfileBag(ts,(chararray)$5,(chararray)30) as t;*
*7 jlsunroll = FOREACH jlsslots GENERATE $0 as user, $1 as item, FLATTEN(t) as 
(slot:chararray, duration:long);*
*8 jlsgroup = GROUP jlsunroll BY (user,item,slot);*
*9  jlsfinal = FOREACH jlsgroup GENERATE group.user, group.item, group.slot, 
SUM(jlsunroll.duration) as duration;*
*10 jls = ORDER jlsfinal BY user;*
*11 STORE jls INTO '/user/hdfs/mydata_10M.dat';*

I get that the reducer step that processes  detailed locations: M:
jlsfilters[3,13] C:  R: jlsslots[4,11],jlsunroll[5,12] produces this logs and 
my job fails to

I get this log and my job uses lots of memory and is killed by the framework 
because it fails to notify its state with this error

*=== Killing log ===*
Task attempt_201401261145_0013_r_000000_0 failed to report status for 600 
seconds. Killing!

*=== Reducer log ===*

*2014-01-26 14:13:09,138 INFO org.apache.hadoop.mapred.ReduceTask:
GetMapEventsThread exiting
2014-01-26 14:13:09,138 INFO org.apache.hadoop.mapred.ReduceTask:
getMapsEventsThread joined.
2014-01-26 14:13:09,139 INFO org.apache.hadoop.mapred.ReduceTask:
Closed ram manager
2014-01-26 14:13:09,139 INFO org.apache.hadoop.mapred.ReduceTask:
Interleaved on-disk merge complete: 0 files left.
2014-01-26 14:13:09,139 INFO org.apache.hadoop.mapred.ReduceTask:
In-memory merge complete: 5 files left.
2014-01-26 14:13:09,159 INFO org.apache.hadoop.mapred.Merger: Merging
5 sorted segments
2014-01-26 14:13:09,159 INFO org.apache.hadoop.mapred.Merger: Down to the last 
merge-pass, with 5 segments left of total size: 480601243 bytes
2014-01-26 14:13:09,162 INFO org.apache.hadoop.io.compress.CodecPool:
Got brand-new compressor [.snappy]
2014-01-26 14:13:15,327 INFO org.apache.hadoop.mapred.ReduceTask:
Merged 5 segments, 480601243 bytes to disk to satisfy reduce memory limit
2014-01-26 14:13:15,328 INFO org.apache.hadoop.mapred.ReduceTask:
Merging 1 files, 142586009 bytes from disk
2014-01-26 14:13:15,329 INFO org.apache.hadoop.mapred.ReduceTask:
Merging 0 segments, 0 bytes from memory into reduce
2014-01-26 14:13:15,329 INFO org.apache.hadoop.mapred.Merger: Merging
1 sorted segments
2014-01-26 14:13:15,334 INFO org.apache.hadoop.mapred.Merger: Down to the last 
merge-pass, with 1 segments left of total size: 480601235 bytes
2014-01-26 14:13:15,641 INFO org.apache.pig.data.SchemaTupleBackend:
Key [pig.schematuple] was not set... will not generate code.
2014-01-26 14:13:15,739 INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce:
Aliases being processed per job phase (AliasName[line,offset]): M:
jlsfilters[3,13] C:  R: jlsslots[4,11],jlsunroll[5,12]
2014-01-26 14:15:59,265 INFO
org.apache.pig.impl.util.SpillableMemoryManager: first memory handler call - 
Collection threshold init = 162594816(158784K) used =
5079178808(4960135K) committed = 6274088960(6127040K) max =
8388608000(8192000K)
2014-01-26 14:16:33,907 INFO
org.apache.pig.impl.util.SpillableMemoryManager: Spilled an estimate of 
4959196460 bytes from 1 objects. init = 162594816(158784K) used =
5079178808(4960135K) committed = 6274088960(6127040K) max =
8388608000(8192000K)
2014-01-26 14:18:10,567 INFO
org.apache.pig.impl.util.SpillableMemoryManager: first memory handler
call- Usage threshold init = 162594816(158784K) used =
5878913448(5741126K) committed = 6274088960(6127040K) max =
8388608000(8192000K)
2014-01-26 14:19:16,325 INFO
org.apache.pig.impl.util.SpillableMemoryManager: Spilled an estimate of 
6492776420 bytes from 1 objects. init = 162594816(158784K) used =
5878913448(5741126K) committed = 6274088960(6127040K) max =
8388608000(8192000K)*

*==== MY UDF CLASS ====*

public class GenerateProfileBag extends EvalFunc<DataBag> { private static 
final String PROFILE_SEPARATOR = "@";

private BagFactory instance = BagFactory.getInstance(); private TupleFactory 
tupleInstance = TupleFactory.getInstance();  @Override public DataBag 
exec(Tuple input) throws IOException {  if (input == null || input.size() < 3) 
{ return null; }

DataBag res = instance.newDefaultBag();

Tuple datetime = tupleInstance.newTuple();  datetime.append(input.get(0)); 
DateTime startDT = ISOHelper.parseDateTime(datetime);

 long duration = Long.parseLong((String) input.get(1)); DateTime endDT = 
startDT.plus(duration);

 int minutes = Integer.parseInt((String)input.get(2));

int dowStart = getDayOfWeek(startDT);
 int dowEnd = getDayOfWeek(endDT);

int slotStart = getMinSlot(startDT, minutes); int slotEnd = getMinSlot(endDT, 
minutes);

Tuple t =  null;
if ( dowStart == dowEnd ) { // Process data in the same day of week  if ( 
slotEnd == slotStart ) { t =  tupleInstance.newTuple(); t.append(dowStart + 
PROFILE_SEPARATOR + slotStart);
 t.append((endDT.getMillis() - startDT.getMillis()) / 1000); res.add(t); } else 
if ( slotStart < slotEnd ) {  t =  tupleInstance.newTuple(); t.append(dowStart 
+ PROFILE_SEPARATOR + slotStart); t.append((slotStart + minutes - 
startDT.minuteOfDay().get()));  res.add(t);
slotStart+=minutes;
while ( slotStart < slotEnd ) {
 t =  tupleInstance.newTuple();
t.append(dowStart + PROFILE_SEPARATOR + slotStart); t.append(minutes);  
res.add(t); slotStart += minutes; }  t =  tupleInstance.newTuple(); 
t.append(dowStart + PROFILE_SEPARATOR + slotEnd);
t.append((endDT.minuteOfDay().get() - slotEnd));  res.add(t); } } else { // 
Process data that span over two or more periods  t = tupleInstance.newTuple(); 
t.append(dowStart + PROFILE_SEPARATOR + slotStart); t.append((slotStart + 
minutes - startDT.minuteOfDay().get()));  res.add(t); slotStart += minutes; if 
( slotStart == 1440 ) {  slotStart = 0; dowStart += 1; dowStart = dowStart % 8; 
 } while ( !(dowStart == dowEnd && slotStart == slotEnd) ) { // Process till 
the right day and slot  if ( slotStart == 1440 ) { slotStart = 0; dowStart += 
1;  dowStart = dowStart % 8; } t =  tupleInstance.newTuple();  
t.append(dowStart + PROFILE_SEPARATOR + slotStart); t.append(minutes); 
res.add(t);  slotStart += minutes; } t =  tupleInstance.newTuple();  
t.append(dowStart + PROFILE_SEPARATOR + slotEnd);
t.append((endDT.minuteOfDay().get() - slotEnd));  res.add(t); } return res; }

private int getMinSlot(DateTime dt, int minutes) { int slot = 
dt.minuteOfDay().get();  slot = (slot / minutes) * minutes; return slot; }

private int getDayOfWeek(DateTime dt) {
int dow = dt.getDayOfWeek();
if ( dow == 7 ) dow = 1;
 else dow = dow + 1;
return dow;
}

 // It is required if Tuple or Bag is returned @Override public Schema 
outputSchema(Schema input) {  List<FuncSpec> funcList = new 
ArrayList<FuncSpec>(); Schema s = new Schema(); s.add(new 
Schema.FieldSchema(null, DataType.BAG));  funcList.add(new 
FuncSpec(this.getClass().getName(), s)); return s; }

// It is used if basic type is returned and to identify overloadings @Override 
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {  
List<FuncSpec> funcList = new ArrayList<FuncSpec>(); Schema s = new Schema(); 
s.add(new Schema.FieldSchema(null, DataType.CHARARRAY));  s.add(new 
Schema.FieldSchema(null, DataType.CHARARRAY)); s.add(new 
Schema.FieldSchema(null, DataType.CHARARRAY));  funcList.add(new 
FuncSpec(this.getClass().getName(), s)); return funcList; } }
----------------------------------------------------------------------------------------
Davide Brambilla
ContentWise R&D
Moviri
[email protected]
phone: +39 02 49517001 mobile: 345 71 13 800 Moviri S.p.A. - Via Schiaffino, 11 
- 20158 Milano (MI) - ITALY

Reply via email to