Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by OlgaN:
http://wiki.apache.org/pig/UDFManual

------------------------------------------------------------------------------
  Eval is the most common type of function. It can be used in `FOREACH` 
statements as shown in this script:
  
  {{{#!java
- -- this is myscript.pig
+ -- myscript.pig
  REGISTER myudfs.jar;
  A = LOAD 'student_data' AS (name: chararray, age: int, gpa: float);
  B = FOREACH A GENERATE myudfs.UPPER(name);
@@ -42, +42 @@

  === How to Write a Simple Eval Function ===
  
  Let's now look at the implementation of the `UPPER` UDF.
- {{{
+ {{{#!java
- 1. package myudfs;
+ package myudfs;
- 2. import java.io.IOException;
+ import java.io.IOException;
- 3. import org.apache.pig.EvalFunc;
+ import org.apache.pig.EvalFunc;
- 4. import org.apache.pig.data.Tuple;
+ import org.apache.pig.data.Tuple;
- 5 import org.apache.pig.impl.util.WrappedIOException;
+ import org.apache.pig.impl.util.WrappedIOException;
  
- 6. public class UPPER extends EvalFunc<String>
+ public class UPPER extends EvalFunc<String>
- 7. {
+ {
- 8.      public String exec(Tuple input) throws IOException {
+     public String exec(Tuple input) throws IOException {
- 9.          if (input `= null |||| input.size() =` 0)
+         if (input `= null |||| input.size() =` 0)
- 10.              return null;
+             return null;
- 11.        try{
+         try{
- 12.            String str = (String)input.get(0);
+             String str = (String)input.get(0);
- 13.            return str.toUpperCase();
+             return str.toUpperCase();
- 14.        }catch(Exception e){
+         }catch(Exception e){
- 15.            throw WrappedIOException.wrap("Caught exception processing 
input row ", e);
+             throw WrappedIOException.wrap("Caught exception processing input 
row ", e);
- 16.        }
+         }
- 17.     }
- 18. }
+     }
+ }
  }}}
  
  The first line indicates that the function is part of the `myudfs` package. 
The UDF class extends the `EvalFunc` class which is the base class for all eval 
functions. It is parameterized with the return type of the UDF which is a Java 
`String` in this case. We will look into the `EvalFunc` class in more detail 
later, but for now all we need to do is to implement the `exec` function. This 
function is invoked on every input tuple. The input into the function is a 
tuple with input parameters in the order they are passed to the function in the 
Pig script. In our example, it will contain a single string field corresponding 
to the student name.
@@ -95, +95 @@

  
  Aggregate functions are another common type of eval function. Aggregate 
functions are usually applied to grouped data, as shown in this script:
  
- {{{
+ {{{#!java
- -- this is myscript2.pig
+ -- myscript2.pig
  A = LOAD 'student_data' AS (name: chararray, age: int, gpa: float);
  B = GROUP A BY name;
  C = FOREACH B GENERATE group, COUNT(A);
@@ -109, +109 @@

  
  It is very important for performance to make sure that aggregate functions 
that are algebraic are implemented as such. Let's look at the implementation of 
the COUNT function to see what this means. (Error handling and some other code 
is omitted to save space. The full code can be accessed 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?view=markup][here.)
  
- {{{
+ {{{#!java
  public class COUNT extends EvalFunc<Long> implements Algebraic{
      public Long exec(Tuple input) throws IOException {return count(input);}
      public String getInitial() {return Initial.class.getName();}
@@ -141, +141 @@

  }
  }}}
  `COUNT` implements `Algebraic` interface which looks like this:
- {{{
+ {{{#!java
  public interface Algebraic{
      public String getInitial();
      public String getIntermed();
@@ -162, +162 @@

  
  The example below uses the `IsEmpy` builtin filter function to implement 
joins.
  
- {{{
+ {{{#!java
  -- inner join
  A = LOAD 'student_data' AS (name: chararray, age: int, gpa: float);
  B = LOAD 'voter_data' AS (name: chararray, age: int, registration: chararay, 
contributions: float);
@@ -175, +175 @@

  
  Note that, even if filtering is omitted, the same results will be produced 
because the `foreach` results is a cross product and cross products get rid of 
empty bags. However, doing up-front filtering is more efficient since it 
reduces the input of the cross product.
  
- {{{
+ {{{#!java
  -- full outer join
  A = LOAD 'student_data' AS (name: chararray, age: int, gpa: float);
  B = LOAD 'voter_data' AS (name: chararray, age: int, registration: chararay, 
contributions: float);
@@ -185, +185 @@

  }}}
  
  The implementation of the `IsEmpty` function looks like this:
- {{{
+ {{{#!java
  import java.io.IOException;
  import java.util.Map;
  import org.apache.pig.FilterFunc;
@@ -237, +237 @@

  
  The builtin `TOKENIZE` function shows how bags and tuples are created. A 
function takes a text string as input and returns a bag of words from the text. 
(Note that currently Pig bags always contain tuples.)
  
- {{{
+ {{{#!java
- 1.  package org.apache.pig.builtin;
+ package org.apache.pig.builtin;
  
- 2.  import java.io.IOException;
+ import java.io.IOException;
- 3.  import java.util.StringTokenizer;
+ import java.util.StringTokenizer;
- 4.  import org.apache.pig.EvalFunc;
+ import org.apache.pig.EvalFunc;
- 5.  import org.apache.pig.data.BagFactory;
+ import org.apache.pig.data.BagFactory;
- 6.  import org.apache.pig.data.DataBag;
+ import org.apache.pig.data.DataBag;
- 7.  import org.apache.pig.data.Tuple;
+ import org.apache.pig.data.Tuple;
- 8.  import org.apache.pig.data.TupleFactory;
+ import org.apache.pig.data.TupleFactory;
  
- 9.  public class TOKENIZE extends EvalFunc<DataBag> {
+ public class TOKENIZE extends EvalFunc<DataBag> {
- 10.TupleFactory mTupleFactory = TupleFactory.getInstance();
+     TupleFactory mTupleFactory = TupleFactory.getInstance();
- 11.BagFactory mBagFactory = BagFactory.getInstance();
+     BagFactory mBagFactory = BagFactory.getInstance();
  
- 12.   public DataBag exec(Tuple input) throws IOException {
+     public DataBag exec(Tuple input) throws IOException 
- 13.        try {
+         try {
- 14.            DataBag output = mBagFactory.newDefaultBag();
+             DataBag output = mBagFactory.newDefaultBag();
- 15.            Object o = input.get(0);
+             Object o = input.get(0);
- 16.            if (!(o instanceof String)) {
+             if (!(o instanceof String)) {
- 17.                throw new IOException("Expected input to be chararray, but 
 got " + o.getClass().getName());
+                 throw new IOException("Expected input to be chararray, but  
got " + o.getClass().getName());
- 18.            }
+             }
- 19.            StringTokenizer tok = new StringTokenizer((String)o, " 
\",()*", false);
+             StringTokenizer tok = new StringTokenizer((String)o, " \",()*", 
false);
- 20.            while (tok.hasMoreTokens()) 
output.add(mTupleFactory.newTuple(tok.nextToken()));
+             while (tok.hasMoreTokens()) 
output.add(mTupleFactory.newTuple(tok.nextToken()));
- 21.            return output;
+             return output;
- 22.        } catch (ExecException ee) {
+         } catch (ExecException ee) {
- 23.            // error handling goes here
+             // error handling goes here
- 24.        }
+         }
- 25.    }
+     }
- 26. }
+ }
  }}}
  
  Lines 10 and 11 create tuple and bag factories respectively. (Factory is a 
class that creates objects of a particular type. For more details, see the 
definition of http://en.wikipedia.org/wiki/Factory_pattern][a factory pattern. 
The factory class itself is implemented as a singleton to guarantee that the 
same factory is used everywhere. For more details see the definition of 
http://en.wikipedia.org/wiki/Singleton_pattern][a singleton pattern.)
@@ -282, +282 @@

  
  Let's assume that we have UDF `Swap` that, given a tuple with two fields, 
swaps their order. Let's assume that the UDF does not specify a schema and look 
at the scripts below:
  
- {{{
+ {{{#!java
- 1. register myudfs.jar;
+ register myudfs.jar;
- 2. A = load 'student_data' as (name: chararray, age: int, gpa: float);
+ A = load 'student_data' as (name: chararray, age: int, gpa: float);
- 3. B = foreach A generate flatten(myudfs.Swap(name, age)), gpa;
+ B = foreach A generate flatten(myudfs.Swap(name, age)), gpa;
- 4. C = foreach B generate $2;
+ C = foreach B generate $2;
- 5. D = limit B 20;
+ D = limit B 20;
- 6. dump D;
+ dump D;
  }}}
  
  This script will result in the following error cause by line 4.
@@ -300, +300 @@

  
  The function, including the schema, looks like this:
  
- {{{
+ {{{#!java
  package myudfs;
  import java.io.IOException;
  import org.apache.pig.EvalFunc;
@@ -344, +344 @@

  
  The second parameter to the `FieldSchema` constructor is the schema 
representing this field, which in this case is a tuple with two fields. The 
third parameter represents the type of the schema, which in this case is a 
`TUPLE`. All supported schema types are defined in the 
`org.apache.pig.data.DataType` class.
  
- {{{
+ {{{#!java
  public class DataType {
      public static final byte UNKNOWN   =   0;
      public static final byte NULL      =   1;
@@ -368, +368 @@

  
  The example above shows how to create an output schema for a tuple. Doing 
this for a bag is very similar. Let's extend the `TOKENIZE` function to do that:
  
- {{{
+ {{{#!java
  package org.apache.pig.builtin;
  
  import java.io.IOException;
@@ -434, +434 @@

  
  This example shows the implementation of the `ABS` function that returns the 
absolute value of a numeric value passed to it as input.
  
- {{{
+ {{{#!java
  import java.io.IOException;
  import java.util.List;
  import java.util.ArrayList;
@@ -475, +475 @@

  
  The main thing to notice in this example is the `getArgToFuncMapping()` 
method. This method returns a list that contains a mapping from the input 
schema to the class that should be used to handle it. In this example the main 
class handles the `bytearray` input and outsources the rest of the work to 
other classes implemented in separate files in the same package. The example of 
one such class is below. This class handles integer input values.
  
- {{{
+ {{{#!java
  import java.io.IOException;
  import org.apache.pig.impl.util.WrappedIOException;
  import org.apache.pig.EvalFunc;
@@ -502, +502 @@

  
  For instance, let's consider function `MAX` which is part of the `piggybank` 
described later in this document. Given two values, the function returns the 
larger value. The function table for `MAX` looks like this:
  
- {{{
+ {{{#!java
  public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
      List<FuncSpec> funcList = new ArrayList<FuncSpec>();
      Util.addToFunctionList(funcList, IntMax.class.getName(), 
DataType.INTEGER);
@@ -518, +518 @@

  
  Let's now see how this function can be used in a Pig script:
  
- {{{
+ {{{#!java
  REGISTER piggybank.jar
  A = LOAD 'student_data' AS (name: chararray, gpa1: float, gpa2: double);
  B = FOREACH A GENERATE name, 
org.apache.pig.piggybank.evaluation.math.MAX(gpa1, gpa2);
@@ -527, +527 @@

  
  In this example, the function gets one parameter of type `float` and another 
of type `double`. The best fit will be the function that takes two double 
values. Pig makes this choice on the user's behalf by inserting implicit casts 
for the parameters. Running the script above is equivalent to running the 
script below:
  
- {{{
+ {{{#!java
  A = LOAD 'student_data' AS (name: chararray, gpa1: float, gpa2: double);
  B = FOREACH A GENERATE name, 
org.apache.pig.piggybank.evaluation.math.MAX((double)gpa1, gpa2);
  DUMP B;
@@ -537, +537 @@

  
  Let's revisit the `UPPER` function from our first example. As it is written 
now, it would only work if the data passed to it is of type `chararray`. To 
make it work with data whose type is not explicitly set, a function table with 
a single entry needs to be added:
  
- {{{
+ {{{#!java
  package myudfs;
  import java.io.IOException;
  import org.apache.pig.EvalFunc;
@@ -566, +566 @@

  
  Now the following script will ran:
  
- {{{
+ {{{#!java
  -- this is myscript.pig
  REGISTER myudfs.jar;
  A = LOAD 'student_data' AS (name, age, gpa);
@@ -583, +583 @@

  
  For instance, the `UPPER` function would now look as follows:
  
- {{{
+ {{{#!java
  public class UPPER extends EvalFunc<String>
  {
          public String exec(Tuple input) throws IOException {
@@ -609, +609 @@

  === Load Functions ===
  Every load function needs to implement the `LoadFunc` interface. An 
abbreviated version is shown below. The full definition can be seen 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/LoadFunc.java?view=markup][here.
  
- {{{
+ {{{#!java
  public interface LoadFunc {
      public void bindTo(String fileName, BufferedPositionedInputStream is, 
long offset, long end) throws IOException;
      public Tuple getNext() throws IOException;
@@ -632, +632 @@

  
  Next is a bunch of conversion routines that convert data from `bytearray` to 
the requested type. This requires further explanation. By default, we would 
like the loader to do as little per-tuple processing as possible. This is 
because many tuples can be thrown out during filtering or joins. Also, many 
fields might not get used because they get projected out. If the data needs to 
be converted into another form, we would like this conversion to happen as late 
as possible. The majority of the loaders should return the data as bytearrays 
and the Pig will request a conversion from bytearray to the actual type when 
needed. Let's looks at the example below:
  
- {{{
+ {{{#!java
  A = load 'student_data' using PigStorage() as (name: chararray, age: int, 
gpa: float);
  B = filter A by age >25;
  C = foreach B generate name;
@@ -653, +653 @@

  
  Here is the example of the function implemented by =BinStorage=:
  
- {{{
+ {{{#!java
  public Schema determineSchema(String fileName, ExecType execType, DataStorage 
storage) throws IOException {
      InputStream is = FileLocalizer.open(fileName, execType, storage);
      bindTo(fileName, new BufferedPositionedInputStream(is), 0, 
Long.MAX_VALUE);
@@ -682, +682 @@

  
  All store functions need to implement the `StoreFunc` interface:
  
- {{{
+ {{{#!java
  public interface StoreFunc {
      public abstract void bindTo(OutputStream os) throws IOException;
      public abstract void putNext(Tuple f) throws IOException;
@@ -694, +694 @@

  
  Here is an example of a simple store function that writes data as a string 
returned from the `toString` function.
  
- {{{
+ {{{#!java
  public class StringStore implements StoreFunc {
      OutputStream os;
      private byte recordDel = (byte)'\n';
@@ -763, +763 @@

  
  The slicer has two basic functions: validate input and slice up the input. 
Both of these methods will be called on the client machine. 
  
- {{{
+ {{{#!java
  public interface Slicer {
      void validate(DataStorage store, String location) throws IOException;
     Slice[] slice(DataStorage store, String location) throws IOException;
@@ -777, +777 @@

  
  Each slice describes a unit of work and will correspond to a map task in 
Hadoop.
  
- {{{
+ {{{#!java
  public interface Slice extends Serializable {
      String[] getLocations();
      void init(DataStorage store) throws IOException;
@@ -799, +799 @@

  
  This example shows a simple `Slicer` that gets a count from the input stream 
and generates that number of `Slice` s. 
  
- {{{
+ {{{#!java
  public class RangeSlicer implements Slicer {
      /**
       * Expects location to be a Stringified integer, and makes

Reply via email to