Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "PigAccumulatorSpec" page has been changed by yinghe.
http://wiki.apache.org/pig/PigAccumulatorSpec

--------------------------------------------------

New page:
= Accumulator UDF =

== Introduction ==
For data processing with PIG, it is very common to call "group by" or "cogroup" 
to group input tuples by a key, then call one or more UDFs to process each 
group. For example:

{{{
A = load 'mydata';
B = group A by $0;
C = foreach B generate group, myUDF1(A), myUDF2(A, 'some_param'), myUDF3(A);
store C into 'myresult';
}}}

The current implementation is during grouping process, all tuples that belongs 
to the same key are materialized into a DataBag, and the DataBag(s) are passed 
to the UDFs. This causes performance and memory problem. For a large key, if 
its tuples can not fit into memory, performance has to sacrifice to spill extra 
data into disk. 

Since many UDFs do not really need to see all the tuples that belongs to a key 
at the same time, it is possible to pass those tuples as batches. A good 
example would be like COUNT(), SUM(). Tuples can be passed to UDFs in 
accumulative manner. When all the tuples are passed, the final method is called 
to retrieve the value. This way, we can minimize the memory usage and improve 
performance by avoiding data spill.

== UDF change ==
An Accumulator interface is defined. UDFs that are able to process tuples in 
accumulative manner should implement this interface. It is defined as following:

{{{
public interface Accumulator <T> {
    /**
     * Pass tuples to the UDF.  You can retrive DataBag by calling 
b.get(index). 
     * Each DataBag may contain 0 to many tuples for current key
     */
    public void accumulate(Tuple b) throws IOException;

    /**
     * Called when all tuples from current key have been passed to accumulate.
     * @return the value for the UDF for this key.
     */
    public T getValue();
    
    /** 
     * Called after getValue() to prepare processing for next key. 
     */
    public void cleanup();

}
}}}

UDF should still extend EvalFunc as before. The PIG engine would detect based 
on context whether tuples can be processed accumulatively. If not, then regular 
EvalFunc would be called. Therefore, for a UDF, both interfaces should be 
implemented properly

== Use Cases ==
PIG engine would process tuples accumulatively only when all of the UDFs 
implements Accumulator interface. If one of the UDF is not Accumulator, then 
all UDFs are called by their EvalFunc interface as regular UDFs. Following are 
examples accumulator interface of UDFs would be called:

   * group by 
     {{{
     A = load 'mydata';
     B = group A by $0;
     C = foreach B generate group, myUDF(A);
     store C into 'myresult';
     }}}

   * cogroup 
     {{{
     A = load 'mydata1';
     B = load 'mydata2';
     C = cogroup A by $0, B by $0;
     D = foreach C generate group, myUDF(A), myUDF(B);
     store D into 'myresult';
     }}}

   * group by with sort
     {{{
     A = load 'mydata';
     B = group A by $0;
     C = foreach B {
         D = order A by $1;
         generate group, myUDF(D);
       }
     store C into 'myresult';
     }}}

   * group by with distinct
     {{{
     A = load 'mydata';
     B = group A by $0;
     C = foreach B {
         D = A.$1;
         E = distinct D;
         generate group, myUDF(E);
       }
     store C into 'myresult';
     }}}

== When to Call Accumulator ==
  MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible to 
run in accumulative mode. Before AccumulatorOptimizer is called, another 
optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks 
if POSort or PODistinct in the inner plan of foreach can be removed/replaced by 
using secondary sorting key supported by hadoop. If it is POSort, then it is 
removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of 
this optimizer, the last two use cases with order by and distinct inside 
foreach inner plan can still run in accumulative mode.

  The AccumulatorOptimizer checks the reducer plan and enables accumulator if 
following criteria are met:
   * The reducer plan uses POPackage as root, not any of its sub-classes. 
POPackage is not for distinct, and any of its input is not set as inner.
   * The successor of POPackage is a POForeach.
   * The leaves of each POForEach input plan is an ExpressionOperator and it 
must be one of the following:
         * ConstantExpression
         * POProject, whose result type is not BAG, or TUPLE and overloaded
         * POMapLookup
         * POCase
         * UnaryExpressionOperator
         * BinaryExpressionOperator
         * POBinCond
         * POUserFunc that implements Accumulator interface and its inputs 
contains only ExpressionOperation, POForEach, or POSortedDistinct, but not 
another POUserFunc.

Therefore, if under POForEach, there are multiple UDFs, some are 
Accumulators,while some are not, the Accumulator would be off.

== Design ==

Once the optimizer detects that the reduce plan can run accumulatively, it set 
a flag to POPackage and POForEach to indicate the data is going to be processed 
in accumulative mode. POForEach in turn sets this flat to all the operations of 
its input plans.

During runtime, POPackages creates a tuple with AccumultiveBag as its fields. 
This bag wraps up an AccumulativeTupleFeeder, which has a handler to the 
reducer Iterator to pull next batch of tuples. It also has a buffer to hold 
tuples of current batch. All AccumulativeBag shares the same feeder. The tuple 
generated by POPackage is passed to POForeach, POForeach is able to get 
AccumulativeTupleFeeder from AccumulativeBag. It then calls feeder.nextBatch() 
to fill the AccumulativeBag with first batch of tuples, pass them to the 
POUserFunc. Because POUserFunc is marked as accumulative, it would call the 
accumulate() of the UDF. The POUserFunc returns with a code of STATUS_BATCH_OK. 
Then POForeach pulls next batch, and so on until the last batch of tuples are 
retrieved and processed. At the end, POForeach notifies POUserFunc that 
accumulation is done. It makes a final call to POUserFunc, which in turn calls 
getValue() to return the final result. 

Following is the sequence diagram of the data flow: 
<img 
src="http://twiki.corp.yahoo.com/pub/Tiger/AccumulatorUDF/SequenceDiagram3.jpg"; 
width="500" height="500> 

Reply via email to