Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AntonioMagnaghi:
http://wiki.apache.org/pig/PigExecutionModel

------------------------------------------------------------------------------
  
  The Package just takes each key, list of values and puts it in appropriate 
format as required by the co-group. So lets say we have (1,R),(2,G) in A and 
(1,B), (2,Y) in B. If there are two reducers, Global Rearrange catering to 
reducer 1 will have {1,{(1,R),(1,B)}} as the key, list of values which should 
be converted into an output tuple for co-group based on the tagged index of the 
tuples in the list. So this would be converted to {1,{(1,R)},{(1,B)}}. 
Similarly, {2,{(2,G),(2,Y)}} will be converted to {2,{(2,G)},{(2,Y)}} by 
reducer 2.
  
+ === Logical to Physical Stubs and API's ===
+ 
+ Below are code samples that define possible APIs for the physical pipeline 
implementation. The fragments aim at validating how pull model and splitting 
can be used in the generation of a physical plan from a logical plan.
+ 
+ The sample code does not take advantage of type information at this point. 
Actual implementation would use type information as made available from the 
logical plan.
+ 
+ The example focuses on the case of the FILTER operator, pull model (via 
iterators), and splitting.
+ 
+ 
+ {{{
+ package org.apache.pig.optimization;
+ 
+ import java.util.Iterator;
+ import java.util.Vector;
+ 
+ public abstract class PhysicalOperator {
+ 
+       public Vector<PhysicalOperator> inputs;
+       
+       public PhysicalOperator() {
+               inputs = new Vector<PhysicalOperator>();
+       }
+       
+       public abstract Iterator<Object> iterator();
+ }
+ }}}
+ 
+ {{{
+ package org.apache.pig.optimization;
+ 
+ import java.util.Iterator;
+ 
+ public class POFilter extends PhysicalOperator {
+ 
+       public PhysicalOperator input;
+       public PhysicalOperator condition;
+       
+       public POFilter(PhysicalOperator input,
+                        PhysicalOperator condition) {
+               this.input = input;
+               this.condition = condition;
+       }
+       
+       private class POFilterIterator implements Iterator<Object> {
+               private Iterator<Object> inputIter;
+               private Iterator<Object> condIter;
+               
+               public POFilterIterator(Iterator<Object> inputIter,
+                                            Iterator<Object> condIter) {
+                       this.inputIter = inputIter;
+                       this.condIter = condIter;
+               }
+               
+               public Object next() {
+                       Object nextVal = null;
+                       
+                       while(inputIter.hasNext()) {
+                               if (((Boolean)condIter.next()).booleanValue() 
== true) {
+                                       nextVal = inputIter.next();
+                                       break;
+                               }
+                               else {
+                                       // skip val
+                                       inputIter.next();
+                               }
+                       }
+                       
+                       return nextVal;
+               }
+               
+               public boolean hasNext() {                      
+                       assert(inputIter.hasNext() ==
+                                  condIter.hasNext());
+                       
+                       return inputIter.hasNext();
+               }
+               
+               public void remove() {
+                       throw new RuntimeException("Not supported");
+               }
+       }
+       
+       public Iterator<Object> iterator() {
+               return new POFilterIterator(input.iterator(),
+                                                 condition.iterator());
+       }
+ }
+ }}}
+ 
+ {{{
+ package org.apache.pig.optimization;
+ 
+ import java.util.Iterator;
+ import java.util.Vector;
+ import java.util.Map;
+ import java.util.HashMap;
+ 
+ public class POSplit extends PhysicalOperator {
+ 
+       // most naive implementation:
+       // all is cached, saving comes from sharing the same
+       // values without recomputing them from the split input
+       //
+       Vector<Object> buffer;
+       
+       // key = split reader
+       // val = position in buffer that reader key is at
+       //
+       Map<POSplitReader, Integer> readerMap;
+       
+       Iterator<Object> iter;
+       
+       public POSplit(PhysicalOperator input) {
+               super();
+               inputs.add(input);
+               buffer = new Vector<Object>();
+               readerMap = new HashMap<POSplitReader, Integer>();
+               iter = null;
+       }
+       
+       public PhysicalOperator[] addReaders(int num) {
+               PhysicalOperator[] result = new PhysicalOperator[num];
+               for (int i = 0; i < num; ++i) {
+                       result[i] = new POSplitReader(this);
+                       
+                       readerMap.put((POSplitReader)result[i],
+                                                 new Integer(-1));
+               }
+               return result;
+       }
+       
+       private class POSplitIterator implements Iterator<Object> {
+       
+               private POSplitReader reader;
+               private POSplit source;
+               
+               public POSplitIterator(POSplitReader reader,
+                                                      POSplit source) {
+                       this.reader = reader;
+                       this.source = source;
+               }
+               
+               public boolean hasNext() {
+                       int readerPos = source.readerMap.get(reader).intValue();
+                       int bufferSize = source.buffer.size();
+                       
+                       if (readerPos + 1 == bufferSize) {
+                               // need to bring one value in if possible
+                               //
+                               if (source.iter.hasNext()) {
+                                       return true; //there is more stuff, not 
cached yet
+                               }
+                               else {
+                                       return false; // reached the end
+                               }
+                       }
+                       else {
+                               return true; // next value is cached already
+                       }
+               }
+               
+               public Object next() {
+                       int readerPos = source.readerMap.get(reader).intValue();
+                       int bufferSize = source.buffer.size();
+                       
+                       if (readerPos + 1 == bufferSize) {
+                               Object nextVal = source.iter.next();
+                               
+                               if (nextVal == null) {
+                                       return null;
+                               }
+                               else {
+                                       source.buffer.add(nextVal);
+                               }
+                       }
+ 
+                       ++readerPos;
+                       source.readerMap.put(reader, new Integer(readerPos));
+                               
+                       return source.buffer.elementAt(readerPos);
+               }
+               
+               public void remove() {
+                       throw new RuntimeException("Unsupported");
+               }
+       }
+       
+       public Iterator<Object> iterator(POSplitReader reader) {
+               iter = inputs.get(0).iterator();
+               return new POSplitIterator(reader, this);
+       }
+       
+       public Iterator<Object> iterator() {
+               throw new RuntimeException("Do not Iterate directly but Use a 
Split Reader");
+       }
+ }
+ }}}
+ 
+ {{{
+ package org.apache.pig.optimization;
+ 
+ import java.util.Iterator;
+ 
+ public class POSplitReader extends PhysicalOperator {
+ 
+       public POSplit source;
+       
+       public POSplitReader(POSplit source) {
+               super();
+               this.source = source;
+       }
+       
+       public Iterator<Object> iterator() {
+               return source.iterator(this);
+       }
+ }
+ }}}
+ 
+ {{{
+ package org.apache.pig.optimization;
+ 
+ public class PhysicalCompiler {
+ 
+       public static PhysicalOperator compile(LogicalOperator lo) {
+               IR ir = new IR(lo);
+               
+               return ir.compile();
+       }
+ }
+ 
+ ---
+ 
+ package org.apache.pig.optimization;
+ 
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.Vector;
+ 
+ /**
+  * In some cases we may need some pre-processing...
+  *
+  */
+ class IR {
+       LogicalOperator root;
+       
+       // key = logical node 
+       // val = vector of nodes under root that match key
+       //
+       Map<LogicalOperator, Vector<LogicalOperator>> nodeMap;
+       
+       // key = logical node
+       // val = vector of physical operators to use in the translation
+       //       process
+       //
+       Map<LogicalOperator, Vector<PhysicalOperator>> translationMap;
+       
+       IR(LogicalOperator root) {
+               this.root = root;
+               this.nodeMap = new HashMap<LogicalOperator,
+                                                Vector<LogicalOperator>>();
+               this.translationMap = new HashMap<LogicalOperator,
+                                                         
Vector<PhysicalOperator>>();
+       }       
+       
+       private void doMapNode(LogicalOperator root,
+                                                  LogicalOperator nodeToMap) {
+               if (root.equals(nodeToMap)) {
+                       Vector<LogicalOperator> map = nodeMap.get(nodeToMap);
+                       
+                       map.add(root);
+               }
+               else {
+                       for (LogicalOperator newRoot : root.inputs) {
+                               doMapNode(newRoot, nodeToMap);
+                       }
+               }
+       }
+       
+       public void mapNode(LogicalOperator nodeToMap) {
+               if (nodeMap.get(nodeToMap) == null) {
+                       nodeMap.put(nodeToMap, new Vector<LogicalOperator>());
+                       
+                       doMapNode(root, nodeToMap);
+               }
+       }
+       
+       public int getNodeMapOccurence(LogicalOperator mapNode) {
+               mapNode(mapNode);
+               
+               Vector<LogicalOperator> map = nodeMap.get(mapNode);
+               
+               if (map != null) {
+                       return map.size();
+               }
+               else {
+                       return 0;
+               }
+       }
+       
+       public void translateNodeTo(LogicalOperator nodeToTranslate,
+                   PhysicalOperator[] translation) {
+               Vector<PhysicalOperator> vec = new Vector<PhysicalOperator>();
+               
+               for (PhysicalOperator po : translation) {
+                       vec.add(po);
+               }
+               
+               translateNodeTo(nodeToTranslate,
+                                  vec);
+       }
+ 
+       
+       public void translateNodeTo(LogicalOperator nodeToTranslate,
+                                        Vector<PhysicalOperator> translation) {
+               mapNode(nodeToTranslate);
+               
+               Vector<LogicalOperator> map = nodeMap.get(nodeToTranslate);
+               
+               if (map.size() != translation.size()) {
+                       throw new RuntimeException("Mismatch!");
+               }
+               
+               translationMap.put(nodeToTranslate, translation);
+       }
+       
+       private POFilter compileLOFilter(LOFilter filter) {
+               assert(filter.condition.type == 
LogicalOperator.PIG_TYPE.BOOLEAN);
+               
+               PhysicalOperator physicalInput = (new 
IR(filter.input)).compile();
+               
+               IR booleanCondIR = new IR(filter.condition);
+               
+               POSplit split = new POSplit(physicalInput);
+               
+               PhysicalOperator splitReads[] = 
+                               split.addReaders(1 +
+                                                   
booleanCondIR.getNodeMapOccurence(filter.input));
+               
+               booleanCondIR.translateNodeTo(filter.input, splitReads);
+               
+               PhysicalOperator booleanCond = booleanCondIR.compile();
+               
+               POFilter result = new POFilter(physicalInput,
+                                                    booleanCond);
+               
+               return result;
+       }
+ 
+       public PhysicalOperator compile() {
+               if (root instanceof LOFilter) {
+                       return compileLOFilter((LOFilter) root);
+               }
+               else {
+                       throw new RuntimeException("Unsupported Logical 
Operator");
+               }
+       }
+ }
+ }}}
+ 

Reply via email to