Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The following page has been changed by OlgaN: http://wiki.apache.org/pig/UDFManual ------------------------------------------------------------------------------ Eval is the most common type of function. It can be used in `FOREACH` statements as shown in this script: {{{#!java - -- this is myscript.pig + -- myscript.pig REGISTER myudfs.jar; A = LOAD 'student_data' AS (name: chararray, age: int, gpa: float); B = FOREACH A GENERATE myudfs.UPPER(name); @@ -42, +42 @@ === How to Write a Simple Eval Function === Let's now look at the implementation of the `UPPER` UDF. - {{{ + {{{#!java - 1. package myudfs; + package myudfs; - 2. import java.io.IOException; + import java.io.IOException; - 3. import org.apache.pig.EvalFunc; + import org.apache.pig.EvalFunc; - 4. import org.apache.pig.data.Tuple; + import org.apache.pig.data.Tuple; - 5 import org.apache.pig.impl.util.WrappedIOException; + import org.apache.pig.impl.util.WrappedIOException; - 6. public class UPPER extends EvalFunc<String> + public class UPPER extends EvalFunc<String> - 7. { + { - 8. public String exec(Tuple input) throws IOException { + public String exec(Tuple input) throws IOException { - 9. if (input `= null |||| input.size() =` 0) + if (input `= null |||| input.size() =` 0) - 10. return null; + return null; - 11. try{ + try{ - 12. String str = (String)input.get(0); + String str = (String)input.get(0); - 13. return str.toUpperCase(); + return str.toUpperCase(); - 14. }catch(Exception e){ + }catch(Exception e){ - 15. throw WrappedIOException.wrap("Caught exception processing input row ", e); + throw WrappedIOException.wrap("Caught exception processing input row ", e); - 16. } + } - 17. } - 18. } + } + } }}} The first line indicates that the function is part of the `myudfs` package. The UDF class extends the `EvalFunc` class which is the base class for all eval functions. It is parameterized with the return type of the UDF which is a Java `String` in this case. We will look into the `EvalFunc` class in more detail later, but for now all we need to do is to implement the `exec` function. This function is invoked on every input tuple. The input into the function is a tuple with input parameters in the order they are passed to the function in the Pig script. In our example, it will contain a single string field corresponding to the student name. @@ -95, +95 @@ Aggregate functions are another common type of eval function. Aggregate functions are usually applied to grouped data, as shown in this script: - {{{ + {{{#!java - -- this is myscript2.pig + -- myscript2.pig A = LOAD 'student_data' AS (name: chararray, age: int, gpa: float); B = GROUP A BY name; C = FOREACH B GENERATE group, COUNT(A); @@ -109, +109 @@ It is very important for performance to make sure that aggregate functions that are algebraic are implemented as such. Let's look at the implementation of the COUNT function to see what this means. (Error handling and some other code is omitted to save space. The full code can be accessed http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?view=markup][here.) - {{{ + {{{#!java public class COUNT extends EvalFunc<Long> implements Algebraic{ public Long exec(Tuple input) throws IOException {return count(input);} public String getInitial() {return Initial.class.getName();} @@ -141, +141 @@ } }}} `COUNT` implements `Algebraic` interface which looks like this: - {{{ + {{{#!java public interface Algebraic{ public String getInitial(); public String getIntermed(); @@ -162, +162 @@ The example below uses the `IsEmpy` builtin filter function to implement joins. - {{{ + {{{#!java -- inner join A = LOAD 'student_data' AS (name: chararray, age: int, gpa: float); B = LOAD 'voter_data' AS (name: chararray, age: int, registration: chararay, contributions: float); @@ -175, +175 @@ Note that, even if filtering is omitted, the same results will be produced because the `foreach` results is a cross product and cross products get rid of empty bags. However, doing up-front filtering is more efficient since it reduces the input of the cross product. - {{{ + {{{#!java -- full outer join A = LOAD 'student_data' AS (name: chararray, age: int, gpa: float); B = LOAD 'voter_data' AS (name: chararray, age: int, registration: chararay, contributions: float); @@ -185, +185 @@ }}} The implementation of the `IsEmpty` function looks like this: - {{{ + {{{#!java import java.io.IOException; import java.util.Map; import org.apache.pig.FilterFunc; @@ -237, +237 @@ The builtin `TOKENIZE` function shows how bags and tuples are created. A function takes a text string as input and returns a bag of words from the text. (Note that currently Pig bags always contain tuples.) - {{{ + {{{#!java - 1. package org.apache.pig.builtin; + package org.apache.pig.builtin; - 2. import java.io.IOException; + import java.io.IOException; - 3. import java.util.StringTokenizer; + import java.util.StringTokenizer; - 4. import org.apache.pig.EvalFunc; + import org.apache.pig.EvalFunc; - 5. import org.apache.pig.data.BagFactory; + import org.apache.pig.data.BagFactory; - 6. import org.apache.pig.data.DataBag; + import org.apache.pig.data.DataBag; - 7. import org.apache.pig.data.Tuple; + import org.apache.pig.data.Tuple; - 8. import org.apache.pig.data.TupleFactory; + import org.apache.pig.data.TupleFactory; - 9. public class TOKENIZE extends EvalFunc<DataBag> { + public class TOKENIZE extends EvalFunc<DataBag> { - 10.TupleFactory mTupleFactory = TupleFactory.getInstance(); + TupleFactory mTupleFactory = TupleFactory.getInstance(); - 11.BagFactory mBagFactory = BagFactory.getInstance(); + BagFactory mBagFactory = BagFactory.getInstance(); - 12. public DataBag exec(Tuple input) throws IOException { + public DataBag exec(Tuple input) throws IOException - 13. try { + try { - 14. DataBag output = mBagFactory.newDefaultBag(); + DataBag output = mBagFactory.newDefaultBag(); - 15. Object o = input.get(0); + Object o = input.get(0); - 16. if (!(o instanceof String)) { + if (!(o instanceof String)) { - 17. throw new IOException("Expected input to be chararray, but got " + o.getClass().getName()); + throw new IOException("Expected input to be chararray, but got " + o.getClass().getName()); - 18. } + } - 19. StringTokenizer tok = new StringTokenizer((String)o, " \",()*", false); + StringTokenizer tok = new StringTokenizer((String)o, " \",()*", false); - 20. while (tok.hasMoreTokens()) output.add(mTupleFactory.newTuple(tok.nextToken())); + while (tok.hasMoreTokens()) output.add(mTupleFactory.newTuple(tok.nextToken())); - 21. return output; + return output; - 22. } catch (ExecException ee) { + } catch (ExecException ee) { - 23. // error handling goes here + // error handling goes here - 24. } + } - 25. } + } - 26. } + } }}} Lines 10 and 11 create tuple and bag factories respectively. (Factory is a class that creates objects of a particular type. For more details, see the definition of http://en.wikipedia.org/wiki/Factory_pattern][a factory pattern. The factory class itself is implemented as a singleton to guarantee that the same factory is used everywhere. For more details see the definition of http://en.wikipedia.org/wiki/Singleton_pattern][a singleton pattern.) @@ -282, +282 @@ Let's assume that we have UDF `Swap` that, given a tuple with two fields, swaps their order. Let's assume that the UDF does not specify a schema and look at the scripts below: - {{{ + {{{#!java - 1. register myudfs.jar; + register myudfs.jar; - 2. A = load 'student_data' as (name: chararray, age: int, gpa: float); + A = load 'student_data' as (name: chararray, age: int, gpa: float); - 3. B = foreach A generate flatten(myudfs.Swap(name, age)), gpa; + B = foreach A generate flatten(myudfs.Swap(name, age)), gpa; - 4. C = foreach B generate $2; + C = foreach B generate $2; - 5. D = limit B 20; + D = limit B 20; - 6. dump D; + dump D; }}} This script will result in the following error cause by line 4. @@ -300, +300 @@ The function, including the schema, looks like this: - {{{ + {{{#!java package myudfs; import java.io.IOException; import org.apache.pig.EvalFunc; @@ -344, +344 @@ The second parameter to the `FieldSchema` constructor is the schema representing this field, which in this case is a tuple with two fields. The third parameter represents the type of the schema, which in this case is a `TUPLE`. All supported schema types are defined in the `org.apache.pig.data.DataType` class. - {{{ + {{{#!java public class DataType { public static final byte UNKNOWN = 0; public static final byte NULL = 1; @@ -368, +368 @@ The example above shows how to create an output schema for a tuple. Doing this for a bag is very similar. Let's extend the `TOKENIZE` function to do that: - {{{ + {{{#!java package org.apache.pig.builtin; import java.io.IOException; @@ -434, +434 @@ This example shows the implementation of the `ABS` function that returns the absolute value of a numeric value passed to it as input. - {{{ + {{{#!java import java.io.IOException; import java.util.List; import java.util.ArrayList; @@ -475, +475 @@ The main thing to notice in this example is the `getArgToFuncMapping()` method. This method returns a list that contains a mapping from the input schema to the class that should be used to handle it. In this example the main class handles the `bytearray` input and outsources the rest of the work to other classes implemented in separate files in the same package. The example of one such class is below. This class handles integer input values. - {{{ + {{{#!java import java.io.IOException; import org.apache.pig.impl.util.WrappedIOException; import org.apache.pig.EvalFunc; @@ -502, +502 @@ For instance, let's consider function `MAX` which is part of the `piggybank` described later in this document. Given two values, the function returns the larger value. The function table for `MAX` looks like this: - {{{ + {{{#!java public List<FuncSpec> getArgToFuncMapping() throws FrontendException { List<FuncSpec> funcList = new ArrayList<FuncSpec>(); Util.addToFunctionList(funcList, IntMax.class.getName(), DataType.INTEGER); @@ -518, +518 @@ Let's now see how this function can be used in a Pig script: - {{{ + {{{#!java REGISTER piggybank.jar A = LOAD 'student_data' AS (name: chararray, gpa1: float, gpa2: double); B = FOREACH A GENERATE name, org.apache.pig.piggybank.evaluation.math.MAX(gpa1, gpa2); @@ -527, +527 @@ In this example, the function gets one parameter of type `float` and another of type `double`. The best fit will be the function that takes two double values. Pig makes this choice on the user's behalf by inserting implicit casts for the parameters. Running the script above is equivalent to running the script below: - {{{ + {{{#!java A = LOAD 'student_data' AS (name: chararray, gpa1: float, gpa2: double); B = FOREACH A GENERATE name, org.apache.pig.piggybank.evaluation.math.MAX((double)gpa1, gpa2); DUMP B; @@ -537, +537 @@ Let's revisit the `UPPER` function from our first example. As it is written now, it would only work if the data passed to it is of type `chararray`. To make it work with data whose type is not explicitly set, a function table with a single entry needs to be added: - {{{ + {{{#!java package myudfs; import java.io.IOException; import org.apache.pig.EvalFunc; @@ -566, +566 @@ Now the following script will ran: - {{{ + {{{#!java -- this is myscript.pig REGISTER myudfs.jar; A = LOAD 'student_data' AS (name, age, gpa); @@ -583, +583 @@ For instance, the `UPPER` function would now look as follows: - {{{ + {{{#!java public class UPPER extends EvalFunc<String> { public String exec(Tuple input) throws IOException { @@ -609, +609 @@ === Load Functions === Every load function needs to implement the `LoadFunc` interface. An abbreviated version is shown below. The full definition can be seen http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/LoadFunc.java?view=markup][here. - {{{ + {{{#!java public interface LoadFunc { public void bindTo(String fileName, BufferedPositionedInputStream is, long offset, long end) throws IOException; public Tuple getNext() throws IOException; @@ -632, +632 @@ Next is a bunch of conversion routines that convert data from `bytearray` to the requested type. This requires further explanation. By default, we would like the loader to do as little per-tuple processing as possible. This is because many tuples can be thrown out during filtering or joins. Also, many fields might not get used because they get projected out. If the data needs to be converted into another form, we would like this conversion to happen as late as possible. The majority of the loaders should return the data as bytearrays and the Pig will request a conversion from bytearray to the actual type when needed. Let's looks at the example below: - {{{ + {{{#!java A = load 'student_data' using PigStorage() as (name: chararray, age: int, gpa: float); B = filter A by age >25; C = foreach B generate name; @@ -653, +653 @@ Here is the example of the function implemented by =BinStorage=: - {{{ + {{{#!java public Schema determineSchema(String fileName, ExecType execType, DataStorage storage) throws IOException { InputStream is = FileLocalizer.open(fileName, execType, storage); bindTo(fileName, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE); @@ -682, +682 @@ All store functions need to implement the `StoreFunc` interface: - {{{ + {{{#!java public interface StoreFunc { public abstract void bindTo(OutputStream os) throws IOException; public abstract void putNext(Tuple f) throws IOException; @@ -694, +694 @@ Here is an example of a simple store function that writes data as a string returned from the `toString` function. - {{{ + {{{#!java public class StringStore implements StoreFunc { OutputStream os; private byte recordDel = (byte)'\n'; @@ -763, +763 @@ The slicer has two basic functions: validate input and slice up the input. Both of these methods will be called on the client machine. - {{{ + {{{#!java public interface Slicer { void validate(DataStorage store, String location) throws IOException; Slice[] slice(DataStorage store, String location) throws IOException; @@ -777, +777 @@ Each slice describes a unit of work and will correspond to a map task in Hadoop. - {{{ + {{{#!java public interface Slice extends Serializable { String[] getLocations(); void init(DataStorage store) throws IOException; @@ -799, +799 @@ This example shows a simple `Slicer` that gets a count from the input stream and generates that number of `Slice` s. - {{{ + {{{#!java public class RangeSlicer implements Slicer { /** * Expects location to be a Stringified integer, and makes
