Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "PigAccumulatorSpec" page has been changed by yinghe.
http://wiki.apache.org/pig/PigAccumulatorSpec?action=diff&rev1=1&rev2=2

--------------------------------------------------

  = Accumulator UDF =
- 
  == Introduction ==
  For data processing with PIG, it is very common to call "group by" or 
"cogroup" to group input tuples by a key, then call one or more UDFs to process 
each group. For example:
  
@@ -11, +10 @@

  C = foreach B generate group, myUDF1(A), myUDF2(A, 'some_param'), myUDF3(A);
  store C into 'myresult';
  }}}
- 
- The current implementation is during grouping process, all tuples that 
belongs to the same key are materialized into a DataBag, and the DataBag(s) are 
passed to the UDFs. This causes performance and memory problem. For a large 
key, if its tuples can not fit into memory, performance has to sacrifice to 
spill extra data into disk. 
+ The current implementation is during grouping process, all tuples that 
belongs to the same key are materialized into a DataBag, and the DataBag(s) are 
passed to the UDFs. This causes performance and memory problem. For a large 
key, if its tuples can not fit into memory, performance has to sacrifice to 
spill extra data into disk.
  
  Since many UDFs do not really need to see all the tuples that belongs to a 
key at the same time, it is possible to pass those tuples as batches. A good 
example would be like COUNT(), SUM(). Tuples can be passed to UDFs in 
accumulative manner. When all the tuples are passed, the final method is called 
to retrieve the value. This way, we can minimize the memory usage and improve 
performance by avoiding data spill.
  
@@ -22, +20 @@

  {{{
  public interface Accumulator <T> {
      /**
-      * Pass tuples to the UDF.  You can retrive DataBag by calling 
b.get(index). 
+      * Pass tuples to the UDF.  You can retrive DataBag by calling 
b.get(index).
       * Each DataBag may contain 0 to many tuples for current key
       */
      public void accumulate(Tuple b) throws IOException;
@@ -32, +30 @@

       * @return the value for the UDF for this key.
       */
      public T getValue();
-     
+ 
-     /** 
+     /**
-      * Called after getValue() to prepare processing for next key. 
+      * Called after getValue() to prepare processing for next key.
       */
      public void cleanup();
  
  }
  }}}
- 
  UDF should still extend EvalFunc as before. The PIG engine would detect based 
on context whether tuples can be processed accumulatively. If not, then regular 
EvalFunc would be called. Therefore, for a UDF, both interfaces should be 
implemented properly
  
  == Use Cases ==
  PIG engine would process tuples accumulatively only when all of the UDFs 
implements Accumulator interface. If one of the UDF is not Accumulator, then 
all UDFs are called by their EvalFunc interface as regular UDFs. Following are 
examples accumulator interface of UDFs would be called:
  
-    * group by 
+  * group by
-      {{{
+   . {{{
       A = load 'mydata';
       B = group A by $0;
       C = foreach B generate group, myUDF(A);
       store C into 'myresult';
-      }}}
+ }}}
  
-    * cogroup 
+  * cogroup
-      {{{
+   . {{{
       A = load 'mydata1';
       B = load 'mydata2';
       C = cogroup A by $0, B by $0;
       D = foreach C generate group, myUDF(A), myUDF(B);
       store D into 'myresult';
-      }}}
+ }}}
  
-    * group by with sort
+  * group by with sort
-      {{{
+   . {{{
       A = load 'mydata';
       B = group A by $0;
       C = foreach B {
@@ -72, +69 @@

           generate group, myUDF(D);
         }
       store C into 'myresult';
-      }}}
+ }}}
  
-    * group by with distinct
+  * group by with distinct
-      {{{
+   . {{{
       A = load 'mydata';
       B = group A by $0;
       C = foreach B {
@@ -84, +81 @@

           generate group, myUDF(E);
         }
       store C into 'myresult';
-      }}}
+ }}}
  
  == When to Call Accumulator ==
-   MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible 
to run in accumulative mode. Before AccumulatorOptimizer is called, another 
optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks 
if POSort or PODistinct in the inner plan of foreach can be removed/replaced by 
using secondary sorting key supported by hadoop. If it is POSort, then it is 
removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of 
this optimizer, the last two use cases with order by and distinct inside 
foreach inner plan can still run in accumulative mode.
+  . MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible 
to run in accumulative mode. Before AccumulatorOptimizer is called, another 
optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks 
if POSort or PODistinct in the inner plan of foreach can be removed/replaced by 
using secondary sorting key supported by hadoop. If it is POSort, then it is 
removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of 
this optimizer, the last two use cases with order by and distinct inside 
foreach inner plan can still run in accumulative mode.
- 
-   The AccumulatorOptimizer checks the reducer plan and enables accumulator if 
following criteria are met:
+  The AccumulatorOptimizer checks the reducer plan and enables accumulator if 
following criteria are met:
-    * The reducer plan uses POPackage as root, not any of its sub-classes. 
POPackage is not for distinct, and any of its input is not set as inner.
+   * The reducer plan uses POPackage as root, not any of its sub-classes. 
POPackage is not for distinct, and any of its input is not set as inner.
-    * The successor of POPackage is a POForeach.
+   * The successor of POPackage is a POForeach.
-    * The leaves of each POForEach input plan is an ExpressionOperator and it 
must be one of the following:
+   * The leaves of each POForEach input plan is an ExpressionOperator and it 
must be one of the following:
-          * ConstantExpression
+    * ConstantExpression
-          * POProject, whose result type is not BAG, or TUPLE and overloaded
+    * POProject, whose result type is not BAG, or TUPLE and overloaded
-          * POMapLookup
+    * POMapLookup
-          * POCase
+    * POCase
-          * UnaryExpressionOperator
+    * UnaryExpressionOperator
-          * BinaryExpressionOperator
+    * BinaryExpressionOperator
-          * POBinCond
+    * POBinCond
-          * POUserFunc that implements Accumulator interface and its inputs 
contains only ExpressionOperation, POForEach, or POSortedDistinct, but not 
another POUserFunc.
+    * POUserFunc that implements Accumulator interface and its inputs contains 
only ExpressionOperation, POForEach, or POSortedDistinct, but not another 
POUserFunc.
  
  Therefore, if under POForEach, there are multiple UDFs, some are 
Accumulators,while some are not, the Accumulator would be off.
  
  == Design ==
- 
  Once the optimizer detects that the reduce plan can run accumulatively, it 
set a flag to POPackage and POForEach to indicate the data is going to be 
processed in accumulative mode. POForEach in turn sets this flat to all the 
operations of its input plans.
  
- During runtime, POPackages creates a tuple with AccumultiveBag as its fields. 
This bag wraps up an AccumulativeTupleFeeder, which has a handler to the 
reducer Iterator to pull next batch of tuples. It also has a buffer to hold 
tuples of current batch. All AccumulativeBag shares the same feeder. The tuple 
generated by POPackage is passed to POForeach, POForeach is able to get 
AccumulativeTupleFeeder from AccumulativeBag. It then calls feeder.nextBatch() 
to fill the AccumulativeBag with first batch of tuples, pass them to the 
POUserFunc. Because POUserFunc is marked as accumulative, it would call the 
accumulate() of the UDF. The POUserFunc returns with a code of STATUS_BATCH_OK. 
Then POForeach pulls next batch, and so on until the last batch of tuples are 
retrieved and processed. At the end, POForeach notifies POUserFunc that 
accumulation is done. It makes a final call to POUserFunc, which in turn calls 
getValue() to return the final result. 
+ During runtime, POPackages creates a tuple with AccumultiveBag as its fields. 
This bag wraps up an AccumulativeTupleFeeder, which has a handler to the 
reducer Iterator to pull next batch of tuples. It also has a buffer to hold 
tuples of current batch. All AccumulativeBag shares the same feeder. The tuple 
generated by POPackage is passed to POForeach, POForeach is able to get 
AccumulativeTupleFeeder from AccumulativeBag. It then calls feeder.nextBatch() 
to fill the AccumulativeBag with first batch of tuples, pass them to the 
POUserFunc. Because POUserFunc is marked as accumulative, it would call the 
accumulate() of the UDF. The POUserFunc returns with a code of STATUS_BATCH_OK. 
Then POForeach pulls next batch, and so on until the last batch of tuples are 
retrieved and processed. At the end, POForeach notifies POUserFunc that 
accumulation is done. It makes a final call to POUserFunc, which in turn calls 
getValue() to return the final result.
  
- Following is the sequence diagram of the data flow: 
+ Following is the sequence diagram of the data flow:
- <img 
src="http://twiki.corp.yahoo.com/pub/Tiger/AccumulatorUDF/SequenceDiagram3.jpg"; 
width="500" height="500> 
  
+ {{attachment:/homes/yinghe/Desktop/SequenceDiagram.jpg}}
+ 

Reply via email to