= Converting UDFs from 0.1.x Pig to 0.2.x Pig =

== EvalFunc Changes ==

=== exec Signature Changes ===

In the earlier versions of Pig, the `exec` function had the following signature:

public void exec(Tuple input, T output) throws IOException;

In 0.2, the signature looks as follows:

public T exec(Tuple input) throws IOException;

The change is to return the output value rather than passing the output in a 
parameter. This is done to allow a function to return a NULL value.

The contract with the UDF is that for fatal errors that should terminate the 
entire processing, the UDF should throw an exception. For non-fatal errors, it 
should return `null` that would be recorded as null value in the output.

Lets take a look at `TOLOWERCASE` function that takes a string and generates a 
lower case version of it.  

In the Pig 0.1.x, it looks as follows:

public class TOLOWERCASE extends EvalFunc<DataAtom>
      public void exec(Tuple input, DataAtom output) throws IOException {
                String str = input.getAtomField(0).strval();

In Pig 0.2, the same function looks as follows:

1. public class TOLOWERCASE extends EvalFunc<String>
2. {
3.   public String exec(Tuple input) throws IOException {
4.        if (input == null || input.size() == 0)
5.            return null;
6.        String str;
7.        try {
8.            str = (String)input.get(0);
9.        } catch (Exception e){
10.          System.err.println("Can't convert field to a string; error = " + 
11.          return null;
12.      }
13.      return str.toLowerCase();
14.   }

There are a couple of things to note here:

   1. The initial check for null or empty `Tuple` (lines 4-5) is needed for all 
UDFs that don't provide type-specific implementations as discussed later.
   2. Catching data-related exceptions and converting them to warnings and null 
return (lines 9-12). Note that, for now, the warning should go to `stderr`. In 
the later versions, we will propagate a logger to the UDFs. As the case with 
#1, converting exceptions to nulls is not needed as discussed later.

=== Input Data Changes ===

In the earlier version of Pig, the data was always propagated to the UDFs in a 
form of strings. Now the default type of the data is `bytearray`. The main 
reason for this is that treating non-ascii data as string can corrupt the data. 
The other one is cost of converting data to a string.

As the result, if UDF expects a data of a particular type, the script needs to 
make sure to pass the data to it in that type. This can be done by either 
declaring the correct data type in the `AS` clause of the `LOAD` statement or 
by explicitly casting the data going to the UDF to the required type.

A = LOAD 'data';
B = foreach A generate TOLOWERCASE($1);


A = LOAD 'data' as (name: chararray);
B = foreach A generate TOLOWERCASE(name);

=== outputSchema changes ===

If a UDF does not overwrite the `outputSchema` function, the output schema is 
assumed to be a single value of `chararray` type. This is compatible with the 
earlier versions of Pig. *If a UDF returns a value other than string, it will 
work correctly without specifying the schema but will force extra data 
conversion and thus be inefficient.*

Here is an example of `SIZE` UDF:

public class SIZE extends EvalFunc<Long> {
    public Schema outputSchema(Schema input) {
        return new Schema(new Schema.FieldSchema(null, DataType.LONG));

The UDF declares that its output is of type `long`.

=== Type Specific Functions ===

In the earlier versions of Pig, since type information was not available, each 
UDF had to choose the type in which to perform computations. Most
generic functions like Pig builtins choose to use Double for arithmetic 
computations. With Pig 0.2, type specific functions can be written to provide 
for more efficient computation. Below is the example of the transformation of 
SUM function:

Old code with some details omitted:

1.  public class SUM extends EvalFunc<DataAtom> implements Algebraic {
2.      public void exec(Tuple input, DataAtom output) throws IOException {
3.          output.setValue(sum(input));
4.      }
5.  static protected double sum(Tuple input) throws IOException {
6.  }    
7. }

New code with some details omitted:

1.  public class SUM extends EvalFunc<Double> implements Algebraic {
2.      public Double exec(Tuple input) throws IOException {
3.          try {
4.              return sum(input);
5.       } catch (ExecException ee) {
6.              IOException oughtToBeEE = new IOException();
7.              oughtToBeEE.initCause(ee);
8.              throw oughtToBeEE;
10.     }
11. }
12.  static protected Double sum(Tuple input) throws ExecException {
13.      DataBag values = (DataBag)input.get(0);

14.      // if we were handed an empty bag, return NULL (compatible with SQL)
15.      if(values.size() == 0) {
16.            return null;
17.      }
18.      double sum = 0;
19.      boolean sawNonNull = false;
20.      for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
21.          Tuple t =;
22.          try {
23.                Double d = DataType.toDouble(t.get(0));
24.                if (d == null) continue;
25.                sawNonNull = true;
26.                sum += d;
27.          }catch(NumberFormatException nfe){
28.               // do nothing - essentially treat this particular input as 
29.          }catch(RuntimeException exp) {
30.               ExecException newE =  new ExecException("Error processing: " +
31.               t.toString() + exp.getMessage(), exp);
32.               throw newE;
33.          }
34.      }
35.      if(sawNonNull) {
36.          return new Double(sum);
37.      } else {
38.          return null;
39       }
40.    }

41.   public Schema outputSchema(Schema input) {
42.        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
43.   }

44.   public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
45.        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
46.        funcList.add(new FuncSpec(this.getClass().getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.BYTEARRAY)));
47.        funcList.add(new FuncSpec(DoubleSum.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.DOUBLE)));
48.        funcList.add(new FuncSpec(FloatSum.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));
49.        funcList.add(new FuncSpec(IntSum.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
50.        funcList.add(new FuncSpec(LongSum.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
51.        return funcList;
52.    }

The main thing to notice here is `getArgToFuncMapping` method (line 44-52) that 
allows to specify different classes to handle different inputs. (The need for 
nested schema is due to the fact that the input in this case is a bag of tuples 
not just a tuple.) The main class handles the default case of `bytearray` by 
converting the data to `double` (line 23). This assures backward compatibility. 
Note that it treats invalid data as null. Also, it sets the output schema to 
Double (lines 41-43).

Below is the example of one of the type-specific classes that handles Integer 

1. public class IntSum extends EvalFunc<Long> implements Algebraic {
2.     public Long exec(Tuple input) throws IOException {
3.       try {
4.          return sum(input);
5.       } catch (ExecException ee) {
6.          IOException oughtToBeEE = new IOException();
7.          oughtToBeEE.initCause(ee);
8.          throw oughtToBeEE;
9.       }
10.   }
11.   static protected  Long sum(Tuple input) throws ExecException {
12.        DataBag values = (DataBag)input.get(0);
13.        if(values.size() == 0) {
14.            return null;
15.        }
16.        long sum = 0;
17.        boolean sawNonNull = false;
18.        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
19.            Tuple t = (Tuple);
20.            try {
21.                Integer i = (Integer)(t.get(0));
22.                if (i == null) continue;
23.                sawNonNull = true;
24.                sum += i;
25.            }catch(RuntimeException exp) {
26.                ExecException newE =  new ExecException("Error processing: " 
27.                t.toString() + exp.getMessage(), exp);
28.                throw newE;
29.           }
30.        }
31.        if(sawNonNull) {
32.            return new Long(sum);
33.        } else {
34.            return null;
35.        }
36.    }
37.    public Schema outputSchema(Schema input) {
38.        return new Schema(new Schema.FieldSchema(null, DataType.LONG));
39.    }    
40. }

There are a couple of things to notice here:

   1. Unlike the default class, this class does not convert the data to the 
specified type; instead, it already expects it to be in that format (line 21). 
It also does not check for null/empty input. This is because Pig takes care of 
this by casting the inputs to the appropriate type and handing null values if 
   2. The output schema is set to indicate that the output type is `long` 
(lines 37-39).

=== Report Progress ===

Part of Hadoop infrastructer is to determine if a job is hanging and to kill 
it. To avoid being killed, the program needs to periodically report its 
progress. Pig's infrastructure takes care of it and most UDFs do not need to 
report progress. However, for functions that take a very long time (more than 5 
minutes) to process a single `Tuple`, the function needs to report periodic 

It can do so, by periodically calling `reporter.progress()` during its 

public class MyUDF extends EvalFunc<String>
     public String exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0)
            return null;
        // do some processing
        // some more processing

== Load/Store function changes ==

There are a couple of changes in Pig 0.2 effect load/store functions.

First, since `Tuple` is now an interface, the loader can't rely on read/write 
functions of the `Tuple` class to load/store the data as it would be 
implementation specific. Instead, each load/store function needs to provide its 
own implementation.

Second, load and store functions need to be type-aware. The load function is 
expected to do two things: (1) produce a tuple with fields of `bytearray`  type 
and (2) provide conversion routines from `bytearray` to all other types. There 
are a couple of reasons to separate the two. First, since the user is not 
required to provide schema, Pig will be treating the data as `bytearray` in the 
absense of one. Second, a lazy conversion, is likely to be more efficient since 
me might not need to convert all data, for instance, if some of it is thrown 
away by the filter or projected out by generate. For store it means to perform 
conversions from the real type of the data to the format in which it should be 

Below, is the example of how !PigStorage looks now (with some details omitted):

1. public class PigStorage extends Utf8StorageConverter implements 
ReversibleLoadStoreFunc {
2. public Tuple getNext() throws IOException {
3.     if (in == null || in.getPosition() > end) {
4.          return null;
5.     }
6.     if (mBuf == null) mBuf = new ByteArrayOutputStream(4096);
7.     mBuf.reset();
8.     while (true) {
9.        // Hadoop's FSDataInputStream (which my input stream is based
10.      // on at some point) is buffered, so I don't need to buffer.
11.      int b =;
12.      if (b == fieldDel) {
13.          readField();
14.      } else if (b == recordDel) {
15.          readField();
16.          Tuple t =  mTupleFactory.newTuple(mProtoTuple);
17.          mProtoTuple.clear();
18.          return t;
19.      } else if (b == -1) {
20.           // hit end of file
21.           return null;
22.      } else {
23.            mBuf.write(b);
25.      }
26.   }
27. }
28. public void putNext(Tuple f) throws IOException {
29.     // I have to convert integer fields to string, and then to bytes.
30.     // If I use a DataOutputStream to convert directly from integer to
31.     // bytes, I don't get a string representation.
32.     int sz = f.size();
33.     for (int i = 0; i < sz; i++) {
34.         Object field;
35.         try {
36.             field = f.get(i);
37.         } catch (ExecException ee) {
38.             throw new IOException(ee);
39.         }
40.         switch (DataType.findType(field)) {
41.             case DataType.NULL:
42.                 break; // just leave it empty
43.             case DataType.BOOLEAN:
44.                 mOut.write(((Boolean)field).toString().getBytes());
45.                 break;
46.  }
47.  public Schema determineSchema(URL fileName) throws IOException {
48.     return null;
49.  }
50.  public void fieldsToRead(Schema schema) {
        // do nothing
51.  }
52. }

There are several things to note here:

   1. Use of `TupleFactory` (line 16) discussed earlier
   2. Type-specific handling of writing tuples (lines 40-45)
   3. Presense of `determineSchema` function. This is for future use and should 
be set to return `null` in Pig 0.2.
   4. Presense of `fieldsToRead` function. This is again for future use and 
should be left empty in Pig 0.2.
   5. Extension of `Utf8StorageConverter` class (line 1). This is the class 
that performs conversion of UTF8 strings into Pig types and can be used by 
other loaders that work with the data stored in the same format. In the future 
we will be also providing similar class for data stored in a binary format.

abstract public class Utf8StorageConverter {
    public DataBag bytesToBag(byte[] b) throws IOException {
        Object o;
        try {
            o = parseFromBytes(b);
        } catch (ParseException pe) {
            throw new IOException(pe.getMessage());
        return (DataBag)o;

    public String bytesToCharArray(byte[] b) throws IOException {
        return new String(b);


