Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 

The following page has been changed by AlanGates:

New page:
= Changes to Pig Being Done on the Types Branch =

== Objective ==

A number of pig developers having spent the past six months reworking the pig 
execution pipeline and adding types support.  This work has been done on the
types branch rather than in trunk.  This work was done in a branch because the 
changes were very destabilizing.  That work is now near completion.  At some
point in the near future the changes will be merged into the trunk.

These changes add a number of new features to pig.  They are also not fully 
backward compatible.  This document describes the new features, calls out areas
of incompatibility, and provides users with information on what they will need 
to change in their Pig Latin scripts and UDFs in order to work with the new
version of pig.

For the purposes of this document the work done on the types branch will be 
referred to as Pig 0.2.0.  The intention is to release these changes at that
revision number.

== New Features and Extensions ==

This section covers new features or changes/enhancements to the existing ones 
and the work required from users to adopt to these changes.

=== Addition of Types ===

Pig 0.1.0 supported a single basic type - unicode String.

Pig 0.2.0 supports the following new types:

   * `bag`: A collection of tuples. 
   * `tuple`: An order set of data (no changes). 
   * `map`: A set of key value pairs, where each key is an scalar and each 
value a datum of any type. 
   * `int`: 32 bit integer (Note that Pig does not support unsigned types. Use 
long instead)
   * `long`: 64 bit integer 
   * `float`: 32 bit floating point 
   * `double`: 64 bit floating point 
   * `chararray`: Character array with characters stored in a unicode format. 
   * `bytearray`: Array of bytes. 

==== Types in the load statement ====

The syntax of the `AS` clause has been extended to include type information:

A = LOAD 'data' USING PigStorage() AS (name, age: long, gpa: float);

The original syntax can still be used for the entire `AS` statement or parts of 
it. If type specification is omitted for a particular field, it is assumed to 
be of type `bytearray`.

`AS` can also be used to define complex structures in your data such as 
`tuples`, `bags`, or `maps`.

A = LOAD 'data' USING MyStorage() AS (T: tuple(name, age: long, gpa: float), B: 
bag{tuple(number: int)}, M: map[]); 

Words `tuple`, `bag`, and `map` in the type declaration are optional. Also, in 
Pig 0.2.0, bags can only contain tuples. This might change in the future. 
Finally, note that the map does not take names or types. The key of the map 
must be a scalar object (no tuples, maps, or bags) and the value can be of any 

'''Users are encouraged to declare the type of the data whenever possible. This 
will result in better parse time error checking as well as more efficient 

==== Constants ====

Pig 0.2.0 provides constant representation for scalar and complex types. 

String (chararray) constants can be specified as ASCII characters (e.g. `x 
MATCHES 'abc'` ).

Any numeric constant consisting of just digits (e.g `123`) is assigned the type 
of `int`. To specify a `long` constant, `l` or `L` must be appended to the 
number (e.g. `12345678L`). If the `l` or `L` are not specified but the number 
is too large to fit into an `int` the problem will be detected at parse time 
and the processing will be terminated. 

Any numeric constant with decimal point (e.g. `1.5`) and/or exponent (e.g. 
`5e+1`) is treated as double unless it ends with `f` or `F` in which case it is 
assigned type float (e.g. `1.5f`).

The complex constants are represented as shown below:

{(1, 5, 18)} - creates a bag
('john', 25, 5.6f) - creates a tuple
['john'#25, 'dan'#18, 'alice'#20] - creates a map

Note that bags must contain tuples; tuples can contain any data; keys of the 
map must be of scalar type and values can be of any type.

Complex constants can be used in the same places scalar constants can be used: 
in filters and generate commands:

A = LOAD 'data' USING MyStorage() AS (T: tuple(name:chararray, age: int));
B = FILTER A BY T == ('john', 25);
D = FOREACH B GENERATE, [25#5.6], {(1, 5, 18)};

==== Cast ====

In Pig 0.1.0 all scalar data fields were represented as `chararray` and were 
cast to double for any arithmetic computations. 

With addition of types into 0.2.0, Pig now has a much richer set of implicit 
casts. It also introduced explicit casts.

The table below outlines allowed casts. The casts that are not supported will 
result in parse time error.

|| '''from / to'''  || '''bag''' || '''tuple''' || '''map''' || '''int''' || 
'''long''' || '''float''' || '''double''' || '''chararray''' || '''bytearray''' 
|| '''bag''' ||  ||  error || error || error || error|| error || error || error 
|| error ||
|| '''tuple''' || error  ||  || error || error || error || error || error || 
error || error ||
|| '''map''' || error || error ||  || error || error || error || error || error 
|| error ||
|| '''int''' || error || error || error ||  || yes || yes || yes || yes || 
error ||
|| '''long''' || error ||error || error || yes ||  || yes || yes || yes || 
error  ||
|| '''float''' || error || error || error  || yes || yes ||   || yes || yes || 
error ||
|| '''double''' || error|| error || error || yes || yes || yes ||  || yes || 
error ||
|| '''chararray''' || error || error || error  || yes || yes || yes || yes ||   
|| error ||
|| '''bytearray''' || yes || yes || yes || yes || yes || yes || yes  || yes ||  

Implicit casts are used when the declared/default type of the data does not 
match the operation in which it is used. For instance, 

A = LOAD 'data' AS (a: int, b: float);
B = foreach A generate a + b;

In this case, type promotion is used and `a` is implicitly cast to float.

A = LOAD 'data' AS (a, b);
B = foreach A generate a + b;

In this case, since type of either operator is not declared the default of 
`bytearray` is used. In this case both operands are implicitly cast to `double` 
to preserve compatibility with previous versions.

The following case is a little more tricky:

A = LOAD 'data' AS (a, b);
B = foreach A generate a + 1, b + 1;

Because the type of `b` is not specified, the choice of the operation will be 
based on the constant and will be selected as `integer`. If field `b`
contains, for instance, `float` values, they will be truncated. '''This is the 
change from 0.1.0 version of Pig''' where data was always promoted to double 
for arithmetic operations. 

Input data:
1, 1.5
2, 2.5

Pig 0.1.0:
2.0, 2.5
3.0, 3.5

Pig 0.2.0:
2, 2
3, 3

There are a couple ways to address this issue. First and the easiest is to 
declare the correct schema of the input. Second is to use constants of the 
appropriate type:

A = LOAD 'data' AS (a, b);
B = foreach A generate a + 1, b + 1.0;

Third is to use explicit cast:

A = LOAD 'data' AS (a, b);
B = FOREACH A GENERATE a + 1, (float)b + 1;

==== 0.1.0 Operations ====

Addition of types required complete rewrite of Pig operations. The detailed 
discussion is available at This 
section describes the changes with the highest impact for the users.

A general rule is followed that a result of any arithmetic computation is of 
the same type as the operands. For instance, a division or multiplication of 2 
integers result in an integer value.

Since in the 0.1.0 version of Pig, there were no numeric constants, string 
constants that contained numbers could be used as numbers, for example: = 
filter a by $1  > '5'=.  Now '5' will be interpreted as a chararray with the 
character 5, not as the number 5. In this particular case, lexigraphical 
comparison rather then numeric one will be applied which might not be what you 

In Pig 0.1.0 and earlier, the following comparison operators  <, >, <=, >=, !=, 
= used to force an operation to numeric, while
`lt`, `gt`, `lte`, `gte`, `neq`, `eq` used to force an operation to string.  
Now `lt`, ''et. al.'' are deprecated but allowed and
they no longer force the operation to a string comparison.  <, ''et. al.'' 
are applied to the data types as they are, without forcing it to numeric.  So 
`$1 > '5'` now compares $1 to '5' as a chararray,
not as an integer.  To compare $1 to the number 5, the syntax now will be: `$1 
> 5`

Having different types in the equality comparison in a `FILTER` can cause 
unexpected results. Consider the following code:

A = LOAD 'data' AS (a: float, b: int);
B = FILTER A by a == 1.42;

In this case, a float column is compared to a double constant which causes the 
float to be promoted to a double. Unfortunately, Java promotion does not 
preserve the exact value and as the result the comparison will fail even when 
`a` contains `1.42`. The solution is to avoid promotion by marking the constant 
as also float:

A = LOAD 'data' AS (a: float, b: int);
B = FILTER A by a == 1.42f;

Alternatively, you could declare the column to be `double` but that would be 
less efficient.

Similarly, in the 0.1.0 version of Pig, the following expression was valid:

A = LOAD 'data' AS (a, b);
B = FORAECH A generate a + '1.42';

This would cause both `a` and `14.2` to be converted to double prior to the 
addition. In Pig 0.2.0 this would produce an error since addition is not defined
for string and `'1.42'` is now interpreted as a sting constant. The code above 
should look as follows in Pig 0.2.0:

A = LOAD 'data' AS (a, b);
B = FORAECH A generate a + 1.42;

A `%` operator has been added that computes modulo

A = LOAD 'data' AS (a: int, b: int);
B = FORAECH A generate a%b;

==== NULL Support ====

Pig 0.2.0 introduces the notion of `NULL` values. NULLs have SQL notion of 
missing data. NULLs can naturally occur in the data or can be result of an 
operation. If the NULLs are part of the data, it will be responsibility of a 
`Load` function to correctly handle that. See discussion later.

The following operations can results in NULL values:

   * Division by zero. 
   * Return from a UDF 
   * Dereference of a map key that does not exist in a map. For example, given 
the map info containing `[name#fred, phone#5551212]`, if the user does 
info#address, a NULL will be returned. 
   * Access of an non-existent field of a tuple. This requires further 

A = LOAD 'data' AS (a, b);
B = foreach A generate b;

In this case, if some rows of the data contain only a single column, a NULL 
will be injected into B for this rows.

Similarly, for loads without declared schema, NULLs will be injected if the 
columns are missing.

A = LOAD 'data';
B = foreach A generate $2;

However, in the following case, an error will be generated:

A = LOAD 'data' AS (a, b);
B = foreach A generate $3;

This is because the requested field is outside of the declared schema. '''This 
is a change compared to earlier Pig versions.'''

The table below describes how different operators interact with NULLs:

|| '''Operator'''  ||  '''Interaction''' ||
|| Comparison operators || If either sub-expression is null, the result of the 
equality comparison will be null ||
|| Matches || If either the string being matched against or the string defining 
the match is null, the result will be null ||
|| Is null    || Returns true if the tested value is null ||
|| Arithmetic operators, concat || If either sub-expression is null, the 
resulting expression is null ||
|| Size || If the tested object is null, size will return null ||
|| Dereference of a map or tuple || If the dereferenced map or tuple is null, 
the result will be null. ||
|| Cast || Casting a null from one type to another will result in a null ||
|| Aggregates || Built-in Aggregate functions will ignore nulls, just as they 
do in SQL. However, user defined aggregates are free to handle nulls the way 
the see fit ||

Boolean expressions that result from comparison or `matches` operators can only 
appear in a `FILTER` operator and if they result
in a null value, the filter does not pass them through. (Note that if X is NULL 
both X and <nop>!X are null and would both be rejected by a filter.)

A = LOAD 'data' AS (name, age: int, gpa: float);
B = FILTER A by age > 30;
dump B;

In this case, records with NULL value of age will not be dumped.

To test for null values, `is {not} null` construct can be used.

A = LOAD 'data' AS (name, age: int, gpa: float);
B = foreach A generate (gpa is null? 0 :gpa) ;

In this case, `B` will contain value of 0 if value of `gpa` was missing from 
the data.

You can also use `is {not} null` in a filter:
A = LOAD 'data' AS (name, age: int, gpa: float);
B = filter A by age is not null and gpa is null;

`B' will retain all the entries where 'age` is specified while the `gpa` is 

Loaders maintained by Pig such as `PigStorage` have been updated to produce 
NULL wherever data is missing. This means that the code below that worked with

A = LOAD 'data' AS (name, age, gpa);
B = FILTER A by name neq '';

will no longer produce correct results since empty string have been interpreted 
as NULL values. The following code should be used instead:

A = LOAD 'data' AS (name, age, gpa);
B = FILTER A by name is not null;

=== Relational Operators ===

==== ORDER BY Extensions ====

In Pig 0.1.0 only ascending order was supported. With 0.2.0 you can specify 
desired order on per column basis.

A = LOAD 'data' AS (name, age: int, gpa: float);
B = ORDER A BY age, gpa desc;

The default order is ascending. In the example above the data would be sorted 
in ascending order of age and in descending order of the gpa.

The other change for `ORDER BY` is that the declared type of the data is used. 
In the example above, numeric ordering will be used for both columns. If the
type of the columns is not specified, the lexigraphical order is used the same 
way it was used in 0.1.0.

==== Limit ====

A new `LIMIT` operator is introduced that limits the number of output tuples 

A = LOAD 'data' AS (name, age: int, gpa: float);
B = limit A 100;

In the example above, the output will consist of 100 of rows of A if A is large 
enough or the number of rows in A if A is
smaller than 100 rows. Note that there are no guarantees on which rows are 
returned and the rows can change from one run to the
next. Also, this is not a well distributed random sample.

A particular set of tuples can be requested by specifying `ORDER BY` operator 
before `LIMIT`;

A = LOAD 'data' AS (name, age: int, gpa: float);
B = ORDER A BY age, gpa desc;
C = limit B 100;

In the example above first 100 rows of B would be returned in the order of B.

Note that `LIMIT` operator is optimized and in most cases will run more 
efficiently than the identical query without the limit. '''It is always a good 
idea to use limit if you can.'''

=== Grunt Improvements ===

==== Command History and Completion ====

Using up and down arrows now allows the users to navigate their command history 
similarly to most Unix shells. 

Also, `TAB` key can be used to auto complete Pig's reserved words as well as 
builtin functions. The user can also provide their own autocompletion list to, 
for instance, add his/her UDFs to the tab completion list. To do so, the user 
can create a file called `autocomplete` in the directory where (s)he run pig 
command. The file should contain all the names of the UDFs that the user uses. 
Note that the name are case sensitive:

Currently, autocompletion is not available for file system navigation.

==== Returning correct line number for syntax errors ====

In the 0.1.0 version of Pig, the error message resulting from a syntax error 
was misleading always saying that the error is on the line #1. This has been
addressed in Pig 0.2.0 where the correct line number in used in the message.

=== Performance improvements ===

Several performance improvements have been made in 0.2.0.

Most of this improvements are transparent to users; however, some requires user 

We have also made more aggressive use of the combiner. In Pig 0.2.0, the 
combiner is invoked in the following cases:

Y = group X ...;
Z = foreach Y generate simple_project || algebraic ...

In the example above, `simple_project` is just a projection of one of the 
columns of `Y` (`group` or `X` in this case), and algebraic is an algebraic UDF 
or a set of UDFs.  `group` key can be omitted from the projection

The following queries will all use combiner:

Y = group X by $0;
Z = foreach Y generate group, COUNT(X), SUM(X);
Y = group X by $0, $1;
Z = foreach Y  generate flatten(group), COUNT(X);
Y = group X all;
Z = foreach Y generate COUNT(X);
Y = group X by $0;
Z = foreach Y generate COUNT(X), group;

Cogroups and queries with non-algebraic UDFs do not yet use the combiner.

=== Builtins Update ===

==== Arithmetic UDFs ====

In the 0.1.0 version of Pig, the numeric values passed in and out of UDFs were 
double. With pig 0.2.0, the real type of the input data can be used to make 
operations more efficient. To take advantage of this, the data types must be 
declared in the `AS` clause of the load or the inputs into the UDF must be cast 
to the appropriate type.

==== PigStorage Extension ====

In Pig 0.2.0, !PigStorage UDF has been extended to handle complex data types. 
On the load side,  fields containing `tuples`, `bags`, and `maps` will be 
recognized and processed accordingly. Similarly, on the store side, complex 
fields will be serialized using the format below. The same format is expected 
on the load side as well:

Bag: {(tuple1),(tuple2),...,(tupleN)}
Tuple: (type1, type2,...,typeN)
Map: [key1#value1,key2#value2,...,keyN#valueN]

==== Changes to PigDump ====

There are a couple of formatting changes to !PigDump. First, the output will no 
longer generate spaces between the fields as it is hard to tell them apart from 
now supported NULL values when the data is read back in.

In the 0.1.0 version of Pig, the data would be produced in the following format:

john smith, 39, 3.81
bob jones, 22, 2.01

In 0.2.0, the same data will look like:

john smith,39,3.81
bob jones,22,2.01

Also, if a result of operation is a long or a float than the value will be 
appended with `L` or `f` respectively.

A = LOAD 'data';
B = group A all;
C = foreach B generate COUNT(A);
store C into 'output' using PigDump();

The result will be `100L`.

Since `PigDump` is used by `dump` command, the changes above affect the `dump` 

==== New UDFs ====

`SIZE` function was added that can be used to get the size of any data type. It 
replaces `ARITY` which is still valid but deprecated.  The following table
describes what `SIZE` returns for each type:
||bag||Number of tuples in the bag||
||tuple||Number of fields in the tuple||
||map||Number of keys in the map||
||chararray||Number of characters in the chararray||
||bytearray||Number of bytes in the bytearray||

`CONCAT` function was added.  It concatenates two `chararray` or `bytearray` 

==== COUNT and NULLs ====

In Pig 0.2.0, `COUNT` actually counts NULL values. This is compatible with 
0.1.0 version that did not recognize NULLs; however, this is not consistent 
with SQL treatment of NULLs. We might choose to fix that in the following 
versions of the software especially once we start supporting SQL. This also 
effects `AVG` as it is computed as 'SUM(A)/COUNT(A)';

== Incompatible Changes ==

This section describes the changes that require user attention in order to make 
their code and UDFs work with Pig 0.2.0.

=== Data Type Changes ===

==== DataAtom Class is Gone ====

This class is no longer available in Pig 0.2.0. Pig allows fields to use basic 
java types such as `String`, `Integer`, etc. directly.

Old UDF code:
public class TOLOWERCASE extends EvalFunc<DataAtom>

New UDF code:
public class TOLOWERCASE extends EvalFunc<String>

The following table describes which java type are used to represent pig types:
||'''Pig Type'''||'''Java Type'''||
||map||java.util.Map<java.lang.Object, java.lang.Object>||

For tuples and bag see below.

==== DataMap Class is Gone ====

`DataMap` has been removed and `Map<Object, Object>` is used instead.

Old UDF code:
public class URLPARSE extends EvalFunc<DataMap>

New UDF code:
public class URLPARSE extends EvalFunc<Map<Object, Object> >

==== Tuple Changes ====

In Pig 0.2.0, the `Tuple` class is no longer a concrete class but an interface. 
This means that tuples can't be directly instantiated but instead a `factory` 
class should be used to create one:

Tuple t =  TupleFactory.getInstance().newTuple();

If you know the number of fields in the tuple it is beneficial to provide that 
in the constructor.

Here is the complete `TupleFactory` interface:

public interface TupleFactory {
    public static TupleFactory getInstance() {}

     * Create an empty tuple.  This should be used as infrequently as
     * possible, use newTuple(int) instead.
    public Tuple newTuple() {}

     * Create a tuple with size fields.  Whenever possible this is prefered
     * over the nullary constructor, as the constructor can preallocate the
     * size of the container holding the fields.  Once this is called, it
     * is legal to call Tuple.set(x, object), where x &lt; size.
     * @param size Number of fields in the tuple.
    public Tuple newTuple(int size) {}

     * Create a tuple from the provided list of objects.
     * @param c List of objects to use as the fields of the tuple.
    public Tuple newTuple(List c) {}

     * Create a tuple with a single element.  This is useful because of
     * the fact that bags (currently) only take tuples, we often end up
     * sticking a single element in a tuple in order to put it in a bag.
     * @param datum Datum to put in the tuple.
    public Tuple newTuple(Object datum) {}

     * Return the actual class representing a tuple that the implementing
     * factory will be returning.  This is needed because hadoop (and
     * possibly other systems) we use need to know the exact class we will
     * be using for input and output.
     * @return Class that implements tuple.

    public Class tupleClass() {}

The `Tuple` interface itself has changed significantly. Here is the new version:

public interface Tuple extends WritableComparable, Serializable {

     * Marker for indicating whether the value this object holds
     * is a null
    public static byte NULL = 0x00;

     * Marker for indicating whether the value this object holds
     * is not a null
    public static byte NOTNULL = 0x01;

     * Make this tuple reference the contents of another.  This method does not 
     * the underlying data.   It maintains references to the data from the 
     * tuple (and possibly even to the data structure holding the data).
     * @param t Tuple to reference.
    void reference(Tuple t);

     * Find the size of the tuple.  Used to be called arity().
     * @return number of fields in the tuple.
    int size();

     * Find out if a given field is null.
     * @param fieldNum Number of field to check for null.
     * @return true if the field is null, false otherwise.
     * @throws ExecException if the field number given is greater
     * than or equal to the number of fields in the tuple.
    boolean isNull(int fieldNum) throws ExecException;

     * Find the type of a given field.
     * @param fieldNum Number of field to get the type for.
     * @return type, encoded as a byte value.  The values are taken from
     * the class DataType.  If the field is null, then DataType.UNKNOWN
     * will be returned.
     * @throws ExecException if the field number is greater than or equal to
     * the number of fields in the tuple.
    byte getType(int fieldNum) throws ExecException;

     * Get the value in a given field.
     * @param fieldNum Number of the field to get the value for.
     * @return value, as an Object.
     * @throws ExecException if the field number is greater than or equal to
     * the number of fields in the tuple.
    Object get(int fieldNum) throws ExecException;

     * Get all of the fields in the tuple as a list.
     * @return List&lt;Object&gt; containing the fields of the tuple
     * in order.
    List<Object> getAll();

     * Set the value in a given field.
     * @param fieldNum Number of the field to set the value for.
     * @param val Object to put in the indicated field.
     * @throws ExecException if the field number is greater than or equal to
     * the number of fields in the tuple.
    void set(int fieldNum, Object val) throws ExecException;

     * Append a field to a tuple.  This method is not efficient as it may
     * force copying of existing data in order to grow the data structure.
     * Whenever possible you should construct your Tuple with the
     * newTuple(int) method and then fill in the values with set(), rather
     * than construct it with newTuple() and append values.
     * @param val Object to append to the tuple.
    void append(Object val);

     * Determine the size of tuple in memory.  This is used by data bags
     * to determine their memory size.  This need not be exact, but it
     * should be a decent estimation.
     * @return estimated memory size.
    long getMemorySize();

     * Write a tuple of atomic values into a string.  All values in the
     * tuple must be atomic (no bags, tuples, or maps).
     * @param delim Delimiter to use in the string.
     * @return A string containing the tuple.
     * @throws ExecException if a non-atomic value is found.
    String toDelimitedString(String delim) throws ExecException;

     * @return true if this Tuple is null
    public boolean isNull();

     * @param isNull boolean indicating whether this tuple is null
    public void setNull(boolean isNull);


==== DataBag Changes ====

Similarly to `Tuple`, `DataBag` is now also an interface and new bags are 
created via calls to `BagFactory`

Bag bag =  BagFactory.getInstance().newDefaultBag();

Here is `BagFactory` interface:

public abstract class BagFactory {
    public static BagFactory getInstance() {

     * Get a default (unordered, not distinct) data bag.
    public abstract DataBag newDefaultBag();

     * Get a sorted data bag.
     * @param comp Comparator that controls how the data is sorted.
     * If null, default comparator will be used.
    public abstract DataBag newSortedBag(Comparator<Tuple> comp);

     * Get a distinct data bag.
    public abstract DataBag newDistinctBag();

    protected BagFactory() {
        gMemMgr = new SpillableMemoryManager();

    protected void registerBag(DataBag b) {


`DataBag` classes themself are mainly unchanged. The full interface can be seen 

=== EvalFunc Changes ===

==== exec Signature Changes ====

In Pig 0.1.0, the `exec` function had the following signature:

public void exec(Tuple input, T output) throws IOException;

In 0.2.0, the signature looks as follows:

public T exec(Tuple input) throws IOException;

The change is to return the output value rather than passing the output in a 
parameter. This is done to allow a function to return a NULL value.

The contract with the UDF is that for fatal errors that should terminate the 
entire processing, the UDF should throw an exception. For non-fatal errors, it 
should return `null` that would be recorded as null value in the output.

Lets take a look at `TOLOWERCASE` function that takes a string and generates a 
lower case version of it.  

In the 0.1.0 version of Pig, it looks as follows:

public class TOLOWERCASE extends EvalFunc<DataAtom>
      public void exec(Tuple input, DataAtom output) throws IOException {
                String str = input.getAtomField(0).strval();

In Pig 0.2.0, the same function looks as follows:

1. public class TOLOWERCASE extends EvalFunc<String>
2. {
3.   public String exec(Tuple input) throws IOException {
4.        if (input `= null |||| input.size() =` 0)
5.            return null;
6.        String str;
7.        try {
8.            str = (String)input.get(0);
9.        } catch (Exception e){
10.          System.err.println("Can't convert field to a string; error = " + 
11.          return null;
12.      }
13.      return str.toLowerCase();
14.   }

There are a couple of things to note here:

   1. The initial check for null or empty `Tuple` (lines 4-5) is needed for all 
UDFs that don't provide type-specific implementations as discussed later.
   2. Catching data-related exceptions and converting them to warnings and null 
return (lines 9-12). Note that, for now, the warning should go to `stderr`. In 
the later versions, we will propagate a logger to the UDFs. As the case with 
#1, converting exceptions to nulls is not needed as discussed later.

==== Input Data Changes ====

In the 0.1.0 version of Pig, the data was always propagated to the UDFs in a 
form of strings. Now the default type of the data is `bytearray`. The main 
reason for this is that treating non-ascii data as string can corrupt the data. 
The other one is cost of converting data to a string.

As the result, if UDF expects a data of a particular type, the script needs to 
make sure to pass the data to it in that type. This can be done by either 
declaring the correct data type in the `AS` clause of the `LOAD` statement or 
by explicitly casting the data going to the UDF to the required type.

A = LOAD 'data';
B = foreach A generate TOLOWERCASE($1);


A = LOAD 'data' as (name: chararray);
B = foreach A generate TOLOWERCASE(name);

==== outputSchema changes ====

If a UDF does not overwrite the `outputSchema` function, the output schema is 
assumed to be a single value of `chararray` type. This is compatible with the
0.1.0 version of Pig. '''If a UDF returns a value other than string, it will 
work correctly without specifying the schema but will force extra data 
conversion and thus be inefficient.'''

Here is an example of `SIZE` UDF:

public class SIZE extends EvalFunc<Long> {
    public Schema outputSchema(Schema input) {
        return new Schema(new Schema.FieldSchema(null, DataType.LONG));

The UDF declares that its output is of type `long`.

==== Type Specific Functions ====

In the 0.1.0 version of Pig, since type information was not available, each UDF 
had to choose the type in which to perform computations. Most generic
functions like Pig builtins choose to use Double for arithmetic computations. 
With Pig 0.2.0, type specific functions can be written to provide for more 
efficient computation. Below is the example of the transformation of SUM 

Old code with some details omitted:

1.  public class SUM extends EvalFunc<DataAtom> implements Algebraic {
2.      public void exec(Tuple input, DataAtom output) throws IOException {
3.          output.setValue(sum(input));
4.      }
5.  static protected double sum(Tuple input) throws IOException {
6.  }    
7. }

New code with some details omitted:

1.  public class SUM extends EvalFunc<Double> implements Algebraic {
2.      public Double exec(Tuple input) throws IOException {
3.          try {
4.              return sum(input);
5.       } catch (ExecException ee) {
6.              IOException oughtToBeEE = new IOException();
7.              oughtToBeEE.initCause(ee);
8.              throw oughtToBeEE;
10.     }
11. }
12.  static protected Double sum(Tuple input) throws ExecException {
13.      DataBag values = (DataBag)input.get(0);

14.      // if we were handed an empty bag, return NULL (compatible with SQL)
15.      if(values.size() == 0) {
16.            return null;
17.      }
18.      double sum = 0;
19.      boolean sawNonNull = false;
20.      for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
21.          Tuple t =;
22.          try {
23.                Double d = DataType.toDouble(t.get(0));
24.                if (d == null) continue;
25.                sawNonNull = true;
26.                sum += d;
27.          }catch(NumberFormatException nfe){
28.               // do nothing - essentially treat this particular input as 
29.          }catch(RuntimeException exp) {
30.               ExecException newE =  new ExecException("Error processing: " +
31.               t.toString() + exp.getMessage(), exp);
32.               throw newE;
33.          }
34.      }
35.      if(sawNonNull) {
36.          return new Double(sum);
37.      } else {
38.          return null;
39       }
40.    }

41.   public Schema outputSchema(Schema input) {
42.        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
43.   }

44.   public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
45.        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
46.        funcList.add(new FuncSpec(this.getClass().getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.BYTEARRAY)));
47.        funcList.add(new FuncSpec(DoubleSum.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.DOUBLE)));
48.        funcList.add(new FuncSpec(FloatSum.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));
49.        funcList.add(new FuncSpec(IntSum.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
50.        funcList.add(new FuncSpec(LongSum.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
51.        return funcList;
52.    }

The main thing to notice here is `getArgToFuncMapping` method (line 44-52) that 
allows specification of different classes to handle different inputs. (The need 
for nested schema is due to the fact that the input in this case is a bag of 
tuples not just a tuple.) The main class handles the default case of 
`bytearray` by converting the data to `double` (line 23). This assures backward 
compatibility. Note that it treats invalid data as null. Also, it sets the 
output schema to Double (lines 41-43).

Below is the example of one of the type-specific classes that handles Integer 

1. public class IntSum extends EvalFunc<Long> implements Algebraic {
2.     public Long exec(Tuple input) throws IOException {
3.       try {
4.          return sum(input);
5.       } catch (ExecException ee) {
6.          IOException oughtToBeEE = new IOException();
7.          oughtToBeEE.initCause(ee);
8.          throw oughtToBeEE;
9.       }
10.   }
11.   static protected  Long sum(Tuple input) throws ExecException {
12.        DataBag values = (DataBag)input.get(0);
13.        if(values.size() == 0) {
14.            return null;
15.        }
16.        long sum = 0;
17.        boolean sawNonNull = false;
18.        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
19.            Tuple t = (Tuple);
20.            try {
21.                Integer i = (Integer)(t.get(0));
22.                if (i == null) continue;
23.                sawNonNull = true;
24.                sum += i;
25.            }catch(RuntimeException exp) {
26.                ExecException newE =  new ExecException("Error processing: " 
27.                t.toString() + exp.getMessage(), exp);
28.                throw newE;
29.           }
30.        }
31.        if(sawNonNull) {
32.            return new Long(sum);
33.        } else {
34.            return null;
35.        }
36.    }
37.    public Schema outputSchema(Schema input) {
38.        return new Schema(new Schema.FieldSchema(null, DataType.LONG));
39.    }    
40. }

There are a couple of things to notice here:

   1. Unlike the default class, this class does not convert the data to the 
specified type; instead, it already expects it to be in that format (line 21). 
It also does not check for null/empty input. This is because Pig takes care of 
this by casting the inputs to the appropriate type and handing null values if 
   2. The output schema is set to indicate that the output type is `long` 
(lines 37-39).

==== Report Progress ====

Part of Hadoop infrastructure is to determine if a job is hanging and to kill 
it. To avoid being killed, the program needs to periodically report its 
progress. Pig's infrastructure takes care of it and most UDFs do not need to 
report progress. However, for functions that take a very long time (more than 5 
minutes) to process a single `Tuple`, the function needs to report periodic 

It can do so, by periodically calling `reporter.progress()` during its 

public class MyUDF extends EvalFunc<String>
     public String exec(Tuple input) throws IOException {
        if (input `= null |||| input.size() =` 0)
            return null;
        // do some processing
        // some more processing

=== Load/Store function changes ===

There are a couple of changes in Pig 0.2.0 effect load/store functions.

First, since `Tuple` is now an interface, the loader can't rely on read/write 
functions of the `Tuple` class to load/store the data as it would be 
implementation specific. Instead, each load/store function needs to provide its 
own implementation.

Second, load and store functions need to be type-aware. The load function is 
expected to do two things: (1) produce a tuple with fields of `bytearray`  type
and (2) provide conversion routines from `bytearray` to all other types (unless 
it is overriding `determineSchema`, for more on this see below). There are a 
couple of reasons to separate the two. First, since the user is not
required to provide schema, Pig will be treating the data as `bytearray` in the 
absense of one. Second, a lazy conversion, is likely to be more efficient
since pig might not need to convert all data, for instance, if some of it is 
thrown away by the filter or projected out by generate. For store it means to 
perform conversions from the real type of the data to the format in which it 
should be stored.

Below, is the example of how !PigStorage looks now (with some details omitted):

1. public class PigStorage extends Utf8StorageConverter implements 
ReversibleLoadStoreFunc {
2. public Tuple getNext() throws IOException {
3.     if (in == null |||| in.getPosition() > end) {
4.          return null;
5.     }
6.     if (mBuf == null) mBuf = new ByteArrayOutputStream(4096);
7.     mBuf.reset();
8.     while (true) {
9.        // Hadoop's FSDataInputStream (which my input stream is based
10.      // on at some point) is buffered, so I don't need to buffer.
11.      int b =;
12.      if (b == fieldDel) {
13.          readField();
14.      } else if (b == recordDel) {
15.          readField();
16.          Tuple t =  mTupleFactory.newTuple(mProtoTuple);
17.          mProtoTuple.clear();
18.          return t;
19.      } else if (b == -1) {
20.           // hit end of file
21.           return null;
22.      } else {
23.            mBuf.write(b);
25.      }
26.   }
27. }
28. public void putNext(Tuple f) throws IOException {
29.     // I have to convert integer fields to string, and then to bytes.
30.     // If I use a DataOutputStream to convert directly from integer to
31.     // bytes, I don't get a string representation.
32.     int sz = f.size();
33.     for (int i = 0; i < sz; i++) {
34.         Object field;
35.         try {
36.             field = f.get(i);
37.         } catch (ExecException ee) {
38.             throw new IOException(ee);
39.         }
40.         switch (DataType.findType(field)) {
41.             case DataType.NULL:
42.                 break; // just leave it empty
43.             case DataType.BOOLEAN:
44.                 mOut.write(((Boolean)field).toString().getBytes());
45.                 break;
46.  }
47.  public Schema determineSchema(URL fileName) throws IOException {
48.     return null;
49.  }
50.  public void fieldsToRead(Schema schema) {
        // do nothing
51.  }
52. }

There are several things to note here:

   1. Use of `TupleFactory` (line 16) discussed earlier
   2. Type-specific handling of writing tuples (lines 40-45)
   3. Presence of `determineSchema` function.
   4. Presence of `fieldsToRead` function. This is again for future use and 
should be left empty in Pig 0.2.0.
   5. Extension of `Utf8StorageConverter` class (line 1). This is the class 
that performs conversion of UTF8 strings into Pig types and can be used by 
other loaders that work with the data stored in the same format. In the future 
we will be also providing similar class for data stored in a binary format.

abstract public class Utf8StorageConverter {
    public DataBag bytesToBag(byte[] b) throws IOException {
        Object o;
        try {
            o = parseFromBytes(b);
        } catch (ParseException pe) {
            throw new IOException(pe.getMessage());
        return (DataBag)o;

    public String bytesToCharArray(byte[] b) throws IOException {
        return new String(b);


==== Use of describeSchema ====
Some load functions may know the type of data they need to create.  For 
example, a loader loading JSON data would know the types of each datum as it 
it.  If this is the case, the user need not declare the types of the data to 
use types.  Instead, the load function can implement the `describeSchema`
method.  The parser calls this method when parsing the Pig Latin script, and 
hence can get the schema of the data from the load function.  

One important difference for load functions implementing `describeSchema` is 
that pig does not do lazy type conversion in this situation.  Instead, it
expects the load function to do the type conversion at load time.

=== Operational Changes ===

In the 0.1.0 version of Pig, jobs that did not require reduce produced the 
output in files named `map-XXXXX`. With 0.2.0, the data will be stored in the 
files named `part-XXXX`. The map-only queries are the ones that don't contain 

Reply via email to