Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by Shravan Narayanamurthy:
http://wiki.apache.org/pig/PigReporting

------------------------------------------------------------------------------
  }}}
  
  == Changes to current code ==
- In the mapReduceLayer, we have to implement this interface with a class that 
wraps the Hadoop reporter. To minimize changes to the code, I am planning to 
have a static variable in PhysicalOperator class which will be set by the map & 
reduce functions at the beginning. Changes to the currently implemented 
operators would involve adding a call to the progress method of this static 
variable as soon as it starts processing a tuple. For ex., in POFilter, the 
current code would be changed to:
+ In the mapReduceLayer, we have to implement this interface with a class that 
wraps the Hadoop reporter. To minimize changes to the code, I am planning to 
have a static variable in PhysicalOperator class which will be set by the map & 
reduce functions at the beginning. Also, the processInput method would call 
PhysicalOperator.reporter.progress() so that all methods which use processInput 
method need not worry about reporting. Only operators that have a special 
processing model becuase of multiple inputs would have to do changes adding a 
call to the progress method of this static variable as soon as it starts 
processing a tuple. For ex., in POUnion, the current code would be changed to:
  
  Current:
  {{{
- while (true) {
+ while(true){
-             inp = processInput();
+             if (done.nextClearBit(0) >= inputs.size()) {
+                 res = new Result();
-             if (inp.returnStatus == POStatus.STATUS_EOP
+                 res.returnStatus = POStatus.STATUS_EOP;
-                     || inp.returnStatus == POStatus.STATUS_ERR)
+                 clearDone();
-                 break;
+                 return res;
+             }
  ...
  }}}
  
  Changed to:
  {{{
- while (true) {
+ while(true){
-             inp = processInput();
              PhysicalOperator.reporter.progress();
+             if (done.nextClearBit(0) >= inputs.size()) {
+                 res = new Result();
-             if (inp.returnStatus == POStatus.STATUS_EOP
+                 res.returnStatus = POStatus.STATUS_EOP;
-                     || inp.returnStatus == POStatus.STATUS_ERR)
+                 clearDone();
-                 break;
+                 return res;
+             }
  ...
  }}}
  

Reply via email to