Author: olga
Date: Fri Sep 26 14:25:02 2008
New Revision: 699506

URL: http://svn.apache.org/viewvc?rev=699506&view=rev
Log:
missing illustrate files

Added:
    
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/
    
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/
    
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=699506&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
 Fri Sep 26 14:25:02 2008
@@ -0,0 +1,145 @@
+package org.apache.pig.backend.local.executionengine.physicalLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
+import 
org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplitOutput;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+
+public class LocalLogToPhyTranslationVisitor extends 
LogToPhyTranslationVisitor {
+
+    private Log log = LogFactory.getLog(getClass());
+    
+    public LocalLogToPhyTranslationVisitor(LogicalPlan plan) {
+       super(plan);
+       // TODO Auto-generated constructor stub
+    }
+    
+    public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
+       return LogToPhyMap;
+    }
+    
+    @Override
+    public void visit(LOCogroup cg) throws VisitorException {
+       String scope = cg.getOperatorKey().scope;
+        List<LogicalOperator> inputs = cg.getInputs();
+        
+        POCogroup poc = new POCogroup(new OperatorKey(scope, 
nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
+        
+        currentPlan.add(poc);
+        
+        int count = 0;
+        Byte type = null;
+        for(LogicalOperator lo : inputs) {
+            List<LogicalPlan> plans = (List<LogicalPlan>) 
cg.getGroupByPlans().get(lo);
+            
+            POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+                    scope, nodeGen.getNextNodeId(scope)), cg
+                    .getRequestedParallelism());
+            List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+            currentPlans.push(currentPlan);
+            for (LogicalPlan lp : plans) {
+                currentPlan = new PhysicalPlan();
+                PlanWalker<LogicalOperator, LogicalPlan> childWalker = 
mCurrentWalker
+                        .spawnChildWalker(lp);
+                pushWalker(childWalker);
+                mCurrentWalker.walk(this);
+                exprPlans.add((PhysicalPlan) currentPlan);
+                popWalker();
+
+            }
+            currentPlan = currentPlans.pop();
+            physOp.setPlans(exprPlans);
+            physOp.setIndex(count++);
+            if (plans.size() > 1) {
+                type = DataType.TUPLE;
+                physOp.setKeyType(type);
+            } else {
+                type = exprPlans.get(0).getLeaves().get(0).getResultType();
+                physOp.setKeyType(type);
+            }
+            physOp.setResultType(DataType.TUPLE);
+
+            currentPlan.add(physOp);
+
+            try {
+                currentPlan.connect(LogToPhyMap.get(lo), physOp);
+                currentPlan.connect(physOp, poc);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
+                throw new VisitorException(e);
+            }
+            
+        }
+        LogToPhyMap.put(cg, poc);
+    }
+    
+    @Override
+    public void visit(LOSplit split) throws VisitorException {
+       String scope = split.getOperatorKey().scope;
+        PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), split.getRequestedParallelism());
+        
+        LogToPhyMap.put(split, physOp);
+
+        currentPlan.add(physOp);
+        PhysicalOperator from = LogToPhyMap.get(split.getPlan()
+                .getPredecessors(split).get(0));
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + 
e.getMessage());
+            throw new VisitorException(e);
+        }
+    }
+    
+    @Override
+    public void visit(LOSplitOutput split) throws VisitorException {
+       String scope = split.getOperatorKey().scope;
+        PhysicalOperator physOp = new POSplitOutput(new OperatorKey(scope, 
nodeGen
+                .getNextNodeId(scope)), split.getRequestedParallelism());
+        LogToPhyMap.put(split, physOp);
+
+        currentPlan.add(physOp);
+        currentPlans.push(currentPlan);
+        currentPlan = new PhysicalPlan();
+        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                .spawnChildWalker(split.getConditionPlan());
+        pushWalker(childWalker);
+        mCurrentWalker.walk(this);
+        popWalker();
+
+        ((POSplitOutput) physOp).setPlan((PhysicalPlan) currentPlan);
+        currentPlan = currentPlans.pop();
+        currentPlan.add(physOp);
+        PhysicalOperator from = LogToPhyMap.get(split.getPlan()
+                .getPredecessors(split).get(0));
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + 
e.getMessage());
+            throw new VisitorException(e);
+        }
+    }
+
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java?rev=699506&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
 Fri Sep 26 14:25:02 2008
@@ -0,0 +1,219 @@
+package 
org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.SortedDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+/** This is a local implementation of Cogroup.
+ * The inputs need to be connected to LocalRearranges possibly by the
+ * logical to physical translator.
+ * 
+ * This is a blocking operator. The outputs of LRs are put into
+ * SortedDataBags. They are sorted on the keys. We then start pulling
+ * tuple out of these bags and start constructing output.
+ * 
+ * @author shubhamc
+ *
+ */
+public class POCogroup extends PhysicalOperator {
+    
+    Tuple[] data = null;
+    Iterator<Tuple>[] its = null;
+
+    public POCogroup(OperatorKey k) {
+       super(k);
+       // TODO Auto-generated constructor stub
+    }
+
+    public POCogroup(OperatorKey k, int rp) {
+       super(k, rp);
+       // TODO Auto-generated constructor stub
+    }
+
+    public POCogroup(OperatorKey k, List<PhysicalOperator> inp) {
+       super(k, inp);
+    }
+
+    public POCogroup(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+       super(k, rp, inp);
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+       // TODO Auto-generated method stub
+       v.visitCogroup(this);
+
+    }
+
+    @Override
+    public String name() {
+       // TODO Auto-generated method stub
+       return "POCogroup" + "[" + DataType.findTypeName(resultType) + "]" +" - 
" + mKey.toString();
+    }
+    
+    @Override
+    public Result getNext(Tuple t) throws ExecException{
+       if(its == null) {
+           accumulateData();
+       }
+       
+       boolean done = true;
+       Result res = new Result();
+       for(int i = 0; i < data.length; i++) {
+           done &= (data[i] == null);
+       }
+       if(done) {
+           res.returnStatus = POStatus.STATUS_EOP;
+           its = null;
+           return res;
+       }
+       
+       Tuple smallestTuple = getSmallest(data);
+       Comparator<Tuple> comp = new groupComparator();
+       
+       int size = data.length;
+       
+       Tuple output = TupleFactory.getInstance().newTuple(size + 1);
+       
+       output.set(0, smallestTuple.get(1));
+       for(int i = 1; i < size + 1; i++) {
+           output.set(i, BagFactory.getInstance().newDefaultBag());
+       }
+       ExampleTuple tOut = null;
+       if(lineageTracer != null) {
+           tOut = new ExampleTuple(output);
+           lineageTracer.insert(tOut);
+       }
+       
+       boolean loop = true;
+       
+       while(loop) {
+           loop = false;
+           for(int i = 0; i < size; i++) {
+               if(data[i] != null && comp.compare(data[i], smallestTuple) == 
0) {
+                   loop = true;
+                   DataBag bag = (DataBag) output.get(i + 1);
+                   //update lineage if it exists
+                   //Tuple temp = ((IndexedTuple) data[i].get(1)).toTuple();
+                   Tuple temp = (Tuple) data[i].get(2);
+                   if(lineageTracer != null) {
+                       if(((ExampleTuple)temp).synthetic) tOut.synthetic = 
true;
+                       lineageTracer.union(temp, tOut);
+                   }
+                   //bag.add(((IndexedTuple) data[i].get(1)).toTuple());
+                   bag.add(temp);
+                   if(its[i].hasNext()) 
+                       data[i] = its[i].next();
+                   else
+                       data[i] = null;
+                       
+                   
+               }
+           }
+       }
+       if(lineageTracer != null)
+           res.result = tOut;
+       else
+           res.result = output;
+       
+       res.returnStatus = POStatus.STATUS_OK;
+       
+       return res;
+    }
+    
+    private void accumulateData() throws ExecException {
+       int size = inputs.size();
+       its = new Iterator[size];
+       data = new Tuple[size];
+       for(int i = 0; i < size; i++) {
+           DataBag bag = new SortedDataBag(new groupComparator());
+           for(Result input = inputs.get(i).getNext(dummyTuple); 
input.returnStatus != POStatus.STATUS_EOP; input = 
inputs.get(i).getNext(dummyTuple)) {
+               if(input.returnStatus == POStatus.STATUS_ERR) {
+                   throw new ExecException("Error accumulating output at local 
Cogroup operator");
+               }
+               bag.add((Tuple) input.result);
+           }
+           its[i] = bag.iterator();
+           data[i] = its[i].next();
+       }
+       
+    }
+    
+//    private Tuple getSmallest(Tuple[] data) {
+//     Tuple t = (Tuple) data[0];
+//     Comparator<Tuple> comp = new groupComparator();
+//     for(int i = 1; i < data.length; i++) {
+//         if(comp.compare(t, (Tuple) data[i]) < 0) 
+//             t = data[i];
+//     }
+//     return t;
+//    }
+    
+    private Tuple getSmallest(Tuple[] data) {
+       Tuple t = null;
+       Comparator<Tuple> comp = new groupComparator();
+       
+       for(int i = 0; i < data.length; i++) {
+           if(data[i] == null) continue;
+           if(t == null) {
+               t = data[i];
+               continue; //since the previous data was probably null so we 
dont really need a comparison
+           }
+           if(comp.compare(t, (Tuple) data[i]) < 0) 
+               t = data[i];
+       }
+       return t;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+       // TODO Auto-generated method stub
+       return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+       // TODO Auto-generated method stub
+       return false;
+    }
+    
+    private class groupComparator implements Comparator<Tuple> {
+
+       public int compare(Tuple o1, Tuple o2) {
+           //We want to make it as efficient as possible by only comparing the 
keys
+           Object t1 = null;
+           Object t2 = null;
+           try {
+               t1 = o1.get(1);
+               t2 = o2.get(1);
+           
+           } catch (ExecException e) {
+               // TODO Auto-generated catch block
+               throw new RuntimeException("Error comparing tuples");
+           }
+           
+           return DataType.compare(t1, t2);
+       }
+       
+       public boolean equals(Object obj) {
+           return this.equals(obj);
+       }
+       
+    }
+
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=699506&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
 Fri Sep 26 14:25:02 2008
@@ -0,0 +1,87 @@
+package 
org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POSplit extends PhysicalOperator {
+    
+    
+    /**
+     * POSplit is a blocking operator. It reads the data from its input into a 
databag and then returns the iterator
+     * of that bag to POSplitOutputs which do the necessary filtering
+     */
+    private static final long serialVersionUID = 1L;
+
+    DataBag data = null;
+    
+    boolean processingDone = false;
+
+    public POSplit(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+       super(k, rp, inp);
+       // TODO Auto-generated constructor stub
+       data = BagFactory.getInstance().newDefaultBag();
+    }
+
+    public POSplit(OperatorKey k, int rp) {
+       this(k, rp, null);
+    }
+
+    public POSplit(OperatorKey k, List<PhysicalOperator> inp) {
+       
+       this(k, -1, inp);
+    }
+
+    public POSplit(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    public Result getNext(Tuple t) throws ExecException{
+       if(!processingDone) {
+           for(Result input = inputs.get(0).getNext(dummyTuple); 
input.returnStatus != POStatus.STATUS_EOP; input = 
inputs.get(0).getNext(dummyTuple)) {
+               if(input.returnStatus == POStatus.STATUS_ERR) {
+                   throw new ExecException("Error accumulating output at local 
Split operator");
+               }
+               data.add((Tuple) input.result);
+           }
+           processingDone = true;
+       }
+
+       Result res = new Result();
+       res.returnStatus = POStatus.STATUS_OK;
+       res.result = data.iterator();
+       return res;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+       v.visitSplit(this);
+    }
+
+    @Override
+    public String name() {
+       return "Split - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+       // TODO Auto-generated method stub
+       return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+       // TODO Auto-generated method stub
+       return true;
+    }
+
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java?rev=699506&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java
 Fri Sep 26 14:25:02 2008
@@ -0,0 +1,114 @@
+package 
org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+public class POSplitOutput extends PhysicalOperator {
+
+    /**
+     * POSplitOutput reads from POSplit using an iterator
+     */
+    private static final long serialVersionUID = 1L;
+    
+    PhysicalOperator compOp;
+    PhysicalPlan compPlan;
+    Iterator<Tuple> it;
+    
+    public POSplitOutput(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+       super(k, rp, inp);
+       // TODO Auto-generated constructor stub
+    }
+
+    public POSplitOutput(OperatorKey k, int rp) {
+       super(k, rp);
+       // TODO Auto-generated constructor stub
+    }
+
+    public POSplitOutput(OperatorKey k, List<PhysicalOperator> inp) {
+       super(k, inp);
+       // TODO Auto-generated constructor stub
+    }
+
+    public POSplitOutput(OperatorKey k) {
+       super(k);
+       // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+       // TODO Auto-generated method stub
+
+    }
+    
+    public Result getNext(Tuple t) throws ExecException {
+       if(it == null) {
+           PhysicalOperator op = getInputs().get(0);
+           Result res = getInputs().get(0).getNext(t);
+           if(res.returnStatus == POStatus.STATUS_OK)
+               it = (Iterator<Tuple>) res.result;
+       }
+       Result res = null;
+       Result inp = new Result();
+       while(true) {
+           if(it.hasNext())
+               inp.result = it.next();
+           else {
+               inp.returnStatus = POStatus.STATUS_EOP;
+               return inp;
+           }
+           inp.returnStatus = POStatus.STATUS_OK;
+
+           compPlan.attachInput((Tuple) inp.result);
+
+           res = compOp.getNext(dummyBool);
+           if (res.returnStatus != POStatus.STATUS_OK 
+                   && res.returnStatus != POStatus.STATUS_NULL) 
+               return res;
+
+           if (res.result != null && (Boolean) res.result == true) {
+               if(lineageTracer != null) {
+                   ExampleTuple tIn = (ExampleTuple) inp.result;
+                   lineageTracer.insert(tIn);
+                   lineageTracer.union(tIn, tIn);
+               }
+               return inp;
+           }
+       }
+        
+    }
+
+    @Override
+    public String name() {
+       // TODO Auto-generated method stub
+       return "POSplitOutput " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+       // TODO Auto-generated method stub
+       return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+       // TODO Auto-generated method stub
+       return false;
+    }
+    
+    public void setPlan(PhysicalPlan compPlan) {
+       this.compPlan = compPlan;
+       this.compOp = compPlan.getLeaves().get(0);
+    }
+
+}


Reply via email to