Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "PigAccumulatorSpec" page has been changed by yinghe.
http://wiki.apache.org/pig/PigAccumulatorSpec?action=diff&rev1=2&rev2=3

--------------------------------------------------

  }}}
  
  == When to Call Accumulator ==
-  . MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible 
to run in accumulative mode. Before AccumulatorOptimizer is called, another 
optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks 
if POSort or PODistinct in the inner plan of foreach can be removed/replaced by 
using secondary sorting key supported by hadoop. If it is POSort, then it is 
removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of 
this optimizer, the last two use cases with order by and distinct inside 
foreach inner plan can still run in accumulative mode.
+  . MR plan is evaluated by an AccumulatorOptimizer to check if it is eligible 
to run in accumulative mode. Before AccumulatorOptimizer is called, another 
optimizer, SecondaryKeyOptimizer, should be called first. This optimizer checks 
if POSort or PODistinct in the inner plan of foreach can be removed/replaced by 
using secondary sorting key supported by hadoop. If it is POSort, then it is 
removed. If it is PODistinct, it is replaced by POSortedDistinct. Because of 
this optimizer, the last two use cases with order by and distinct inside 
foreach inner plan can still run in accumulative mode. The AccumulatorOptimizer 
checks the reducer plan and enables accumulator if following criteria are met:
-  The AccumulatorOptimizer checks the reducer plan and enables accumulator if 
following criteria are met:
    * The reducer plan uses POPackage as root, not any of its sub-classes. 
POPackage is not for distinct, and any of its input is not set as inner.
    * The successor of POPackage is a POForeach.
    * The leaves of each POForEach input plan is an ExpressionOperator and it 
must be one of the following:
@@ -109, +108 @@

  
  {{attachment:/homes/yinghe/Desktop/SequenceDiagram.jpg}}
  
+ == Internal Changes ==
+ === Accumulator ===
+  . A new interface that UDF can implement if it can run in accumulative mode.
+ 
+ === PhysicalOperator ===
+  . Add new methods setAccumulative(), setAccumStart(), setAccumEnd() to flag 
a physical operator to run in accumulative mode, and mark the start and end of 
accumulation. This change is in patch of PIG-1038.
+ 
+ === MapReduceLauncher ===
+  . Create AccumulatorOptimizer and use it to visit the plan.
+ 
+ === AccumulatorOptimizer ===
+  . Another MROpPlanVisitor. It checks the reduce plan, if it meets all the 
criteria, it sets the "accumulative" flag to POPackage and POForEach. It is 
created and invoked by MapReducerLauncher.
+ 
+ === POStatus ===
+  . Add a new state "STATUS_BATCH_OK" to indicate a batch is processed 
successfully in accumulative mode.
+ 
+ === POForEach ===
+  . If its "accumulative" flag is set, the bags passed to it through a tuple 
are AccumulativeBag as opposed to regular tuple bags. It gets 
AccumulativeTupleBuffer from the bag. Then it runs a while loop of calling 
nextBatch() of AccumulativeTupleBuffer, pass the input to inner plans. If an 
inner plan contains any UDF, the inner plan returns POStatus.STATUS_BATCH_OK if 
current batch is processed successfully. When there are no more batches to 
process, POForEach notifies each inner plan that accumulation is done, it makes 
a final call to get result and out of the while loop. At the end, POForEach 
returns the result to its successor in reducer plan. The operators that called 
POForEach doesn't need to know whether POForEach gets its result through 
regular mode or accumulative mode.
+ 
+ === AccumulativeBag ===
+  . An implementation of DataBag use by POPackage for processing data in 
accumulative mode. This bag doesn't contain all tuples from iterator. Instead, 
it wrapps up AccumultiveTupleBuffer, which contains iterator to pull tuples out 
in batches. Call the iterator() of this call only gives you the tuples for 
current batch.
+ 
+ === AccumulativeTupleBuffer ===
+  . An underlying buffer that is shared by all AccumulativeBags (one bag for 
group by, multiple bags for cogroup) generated by POPackage. POPackage has an 
inner class which implements this interface. POPackage creates an instance of 
this buffer and set it into the AccumulativeBags. This buffer has methods to 
retrieve next batch of tuples, which in turn calls methods of POPackage to read 
tuples out of iterator, and put them in an internal list. The AccumulativeBag 
has access to that list to return iterator of tuples.
+ 
+ === POPackage ===
+  . If its "accumulative" flag is set, it creates AccumulativeBag and 
AccumulativeTupleBuffer as opposed to creating default tuple bags. It then sets 
AccumulativeTupleBuffer into AccumulativeBag, and set ACcumulativeBag into the 
tuple in result.
+  POPackage also has an inner class which implements AccumulativeTupleBuffer 
interface.
+ 
+ === ExpressionOperator ===
+  . Add new methods getChildExpression(), containUDF() and accumChild(). The 
accumChild() is called by all expression operators that has more than one child 
operator. The expression operator needs to drive all child operators that 
contain UDF to process batched data. If it is in accumulative mode, 
accumChild() returns POStatus.STATUS_BATCH_OK. If it is not in accumulative 
mode, accumChild() returns null.
+ 
+ === POUserFunc ===
+  . Which method of UDF to call is changed based on its state. If accumulative 
flag is on, and accumulation is started, it calls accumulate(), if accumulation 
is ended, it calls getValue(), followed by cleanup(). If accumulative flag is 
off, call exec().
+ 
+ === Other ExpressionOperators ===
+  . All of the following operations are changed to call accumChild() in 
getNext(), if it returns null, then call its regular logic.
+   * GreaterThanExpr
+   * EqualToExpr
+   * LTOrEqualToExpr
+   * LessThanExpr
+   * NotEqualToExpr
+   * GTOrEqualToExpr
+   * Divide
+   * Add
+   * Mod
+   * Multiply
+   * Subtract
+   * POOr
+   * POAnd
+   * POBinCond
+   * PORegexp
+ 
+ === Buildin UDFs ===
+  . The following UDFs are changed to implement Accumulator interface.
+  .
+   * MAX
+ 
+   * IntMax
+ 
+   * LongMax
+ 
+   * DoubleMax
+ 
+   * FloatMax
+ 
+   * StringMax
+ 
+   * SUM
+ 
+   * IntSum
+ 
+   * LongSum
+ 
+   * DoubleSum
+   * FloatSUM
+ 
+   * MIN
+ 
+   * IntMin
+ 
+   * DoubleMin
+ 
+   * LongMin
+ 
+   * FloatMin
+ 
+   * StringMin
+ 
+   * AVG
+ 
+   * IntAvg
+ 
+   * LongAvg
+ 
+   * FloatAvg
+ 
+   * DoubleAvg[[DoubleAvg|]]
+ 
+   * COUNT_STAR
+   * COUNT
+ 

Reply via email to