Author: olga Date: Fri Sep 26 14:25:02 2008 New Revision: 699506 URL: http://svn.apache.org/viewvc?rev=699506&view=rev Log: missing illustrate files
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=699506&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java Fri Sep 26 14:25:02 2008 @@ -0,0 +1,145 @@ +package org.apache.pig.backend.local.executionengine.physicalLayer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup; +import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplitOutput; +import org.apache.pig.data.DataType; +import org.apache.pig.impl.logicalLayer.LOCogroup; +import org.apache.pig.impl.logicalLayer.LOSplit; +import org.apache.pig.impl.logicalLayer.LOSplitOutput; +import org.apache.pig.impl.logicalLayer.LogicalOperator; +import org.apache.pig.impl.logicalLayer.LogicalPlan; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.PlanWalker; +import org.apache.pig.impl.plan.VisitorException; + + +public class LocalLogToPhyTranslationVisitor extends LogToPhyTranslationVisitor { + + private Log log = LogFactory.getLog(getClass()); + + public LocalLogToPhyTranslationVisitor(LogicalPlan plan) { + super(plan); + // TODO Auto-generated constructor stub + } + + public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() { + return LogToPhyMap; + } + + @Override + public void visit(LOCogroup cg) throws VisitorException { + String scope = cg.getOperatorKey().scope; + List<LogicalOperator> inputs = cg.getInputs(); + + POCogroup poc = new POCogroup(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism()); + + currentPlan.add(poc); + + int count = 0; + Byte type = null; + for(LogicalOperator lo : inputs) { + List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(lo); + + POLocalRearrange physOp = new POLocalRearrange(new OperatorKey( + scope, nodeGen.getNextNodeId(scope)), cg + .getRequestedParallelism()); + List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>(); + currentPlans.push(currentPlan); + for (LogicalPlan lp : plans) { + currentPlan = new PhysicalPlan(); + PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker + .spawnChildWalker(lp); + pushWalker(childWalker); + mCurrentWalker.walk(this); + exprPlans.add((PhysicalPlan) currentPlan); + popWalker(); + + } + currentPlan = currentPlans.pop(); + physOp.setPlans(exprPlans); + physOp.setIndex(count++); + if (plans.size() > 1) { + type = DataType.TUPLE; + physOp.setKeyType(type); + } else { + type = exprPlans.get(0).getLeaves().get(0).getResultType(); + physOp.setKeyType(type); + } + physOp.setResultType(DataType.TUPLE); + + currentPlan.add(physOp); + + try { + currentPlan.connect(LogToPhyMap.get(lo), physOp); + currentPlan.connect(physOp, poc); + } catch (PlanException e) { + log.error("Invalid physical operators in the physical plan" + + e.getMessage()); + throw new VisitorException(e); + } + + } + LogToPhyMap.put(cg, poc); + } + + @Override + public void visit(LOSplit split) throws VisitorException { + String scope = split.getOperatorKey().scope; + PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), split.getRequestedParallelism()); + + LogToPhyMap.put(split, physOp); + + currentPlan.add(physOp); + PhysicalOperator from = LogToPhyMap.get(split.getPlan() + .getPredecessors(split).get(0)); + try { + currentPlan.connect(from, physOp); + } catch (PlanException e) { + log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); + } + } + + @Override + public void visit(LOSplitOutput split) throws VisitorException { + String scope = split.getOperatorKey().scope; + PhysicalOperator physOp = new POSplitOutput(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), split.getRequestedParallelism()); + LogToPhyMap.put(split, physOp); + + currentPlan.add(physOp); + currentPlans.push(currentPlan); + currentPlan = new PhysicalPlan(); + PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker + .spawnChildWalker(split.getConditionPlan()); + pushWalker(childWalker); + mCurrentWalker.walk(this); + popWalker(); + + ((POSplitOutput) physOp).setPlan((PhysicalPlan) currentPlan); + currentPlan = currentPlans.pop(); + currentPlan.add(physOp); + PhysicalOperator from = LogToPhyMap.get(split.getPlan() + .getPredecessors(split).get(0)); + try { + currentPlan.connect(from, physOp); + } catch (PlanException e) { + log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); + } + } + +} Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java?rev=699506&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java Fri Sep 26 14:25:02 2008 @@ -0,0 +1,219 @@ +package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.SortedDataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.pen.util.ExampleTuple; + +/** This is a local implementation of Cogroup. + * The inputs need to be connected to LocalRearranges possibly by the + * logical to physical translator. + * + * This is a blocking operator. The outputs of LRs are put into + * SortedDataBags. They are sorted on the keys. We then start pulling + * tuple out of these bags and start constructing output. + * + * @author shubhamc + * + */ +public class POCogroup extends PhysicalOperator { + + Tuple[] data = null; + Iterator<Tuple>[] its = null; + + public POCogroup(OperatorKey k) { + super(k); + // TODO Auto-generated constructor stub + } + + public POCogroup(OperatorKey k, int rp) { + super(k, rp); + // TODO Auto-generated constructor stub + } + + public POCogroup(OperatorKey k, List<PhysicalOperator> inp) { + super(k, inp); + } + + public POCogroup(OperatorKey k, int rp, List<PhysicalOperator> inp) { + super(k, rp, inp); + } + + @Override + public void visit(PhyPlanVisitor v) throws VisitorException { + // TODO Auto-generated method stub + v.visitCogroup(this); + + } + + @Override + public String name() { + // TODO Auto-generated method stub + return "POCogroup" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString(); + } + + @Override + public Result getNext(Tuple t) throws ExecException{ + if(its == null) { + accumulateData(); + } + + boolean done = true; + Result res = new Result(); + for(int i = 0; i < data.length; i++) { + done &= (data[i] == null); + } + if(done) { + res.returnStatus = POStatus.STATUS_EOP; + its = null; + return res; + } + + Tuple smallestTuple = getSmallest(data); + Comparator<Tuple> comp = new groupComparator(); + + int size = data.length; + + Tuple output = TupleFactory.getInstance().newTuple(size + 1); + + output.set(0, smallestTuple.get(1)); + for(int i = 1; i < size + 1; i++) { + output.set(i, BagFactory.getInstance().newDefaultBag()); + } + ExampleTuple tOut = null; + if(lineageTracer != null) { + tOut = new ExampleTuple(output); + lineageTracer.insert(tOut); + } + + boolean loop = true; + + while(loop) { + loop = false; + for(int i = 0; i < size; i++) { + if(data[i] != null && comp.compare(data[i], smallestTuple) == 0) { + loop = true; + DataBag bag = (DataBag) output.get(i + 1); + //update lineage if it exists + //Tuple temp = ((IndexedTuple) data[i].get(1)).toTuple(); + Tuple temp = (Tuple) data[i].get(2); + if(lineageTracer != null) { + if(((ExampleTuple)temp).synthetic) tOut.synthetic = true; + lineageTracer.union(temp, tOut); + } + //bag.add(((IndexedTuple) data[i].get(1)).toTuple()); + bag.add(temp); + if(its[i].hasNext()) + data[i] = its[i].next(); + else + data[i] = null; + + + } + } + } + if(lineageTracer != null) + res.result = tOut; + else + res.result = output; + + res.returnStatus = POStatus.STATUS_OK; + + return res; + } + + private void accumulateData() throws ExecException { + int size = inputs.size(); + its = new Iterator[size]; + data = new Tuple[size]; + for(int i = 0; i < size; i++) { + DataBag bag = new SortedDataBag(new groupComparator()); + for(Result input = inputs.get(i).getNext(dummyTuple); input.returnStatus != POStatus.STATUS_EOP; input = inputs.get(i).getNext(dummyTuple)) { + if(input.returnStatus == POStatus.STATUS_ERR) { + throw new ExecException("Error accumulating output at local Cogroup operator"); + } + bag.add((Tuple) input.result); + } + its[i] = bag.iterator(); + data[i] = its[i].next(); + } + + } + +// private Tuple getSmallest(Tuple[] data) { +// Tuple t = (Tuple) data[0]; +// Comparator<Tuple> comp = new groupComparator(); +// for(int i = 1; i < data.length; i++) { +// if(comp.compare(t, (Tuple) data[i]) < 0) +// t = data[i]; +// } +// return t; +// } + + private Tuple getSmallest(Tuple[] data) { + Tuple t = null; + Comparator<Tuple> comp = new groupComparator(); + + for(int i = 0; i < data.length; i++) { + if(data[i] == null) continue; + if(t == null) { + t = data[i]; + continue; //since the previous data was probably null so we dont really need a comparison + } + if(comp.compare(t, (Tuple) data[i]) < 0) + t = data[i]; + } + return t; + } + + @Override + public boolean supportsMultipleInputs() { + // TODO Auto-generated method stub + return true; + } + + @Override + public boolean supportsMultipleOutputs() { + // TODO Auto-generated method stub + return false; + } + + private class groupComparator implements Comparator<Tuple> { + + public int compare(Tuple o1, Tuple o2) { + //We want to make it as efficient as possible by only comparing the keys + Object t1 = null; + Object t2 = null; + try { + t1 = o1.get(1); + t2 = o2.get(1); + + } catch (ExecException e) { + // TODO Auto-generated catch block + throw new RuntimeException("Error comparing tuples"); + } + + return DataType.compare(t1, t2); + } + + public boolean equals(Object obj) { + return this.equals(obj); + } + + } + +} Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=699506&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java Fri Sep 26 14:25:02 2008 @@ -0,0 +1,87 @@ +package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators; + +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; + +public class POSplit extends PhysicalOperator { + + + /** + * POSplit is a blocking operator. It reads the data from its input into a databag and then returns the iterator + * of that bag to POSplitOutputs which do the necessary filtering + */ + private static final long serialVersionUID = 1L; + + DataBag data = null; + + boolean processingDone = false; + + public POSplit(OperatorKey k, int rp, List<PhysicalOperator> inp) { + super(k, rp, inp); + // TODO Auto-generated constructor stub + data = BagFactory.getInstance().newDefaultBag(); + } + + public POSplit(OperatorKey k, int rp) { + this(k, rp, null); + } + + public POSplit(OperatorKey k, List<PhysicalOperator> inp) { + + this(k, -1, inp); + } + + public POSplit(OperatorKey k) { + this(k, -1, null); + } + + public Result getNext(Tuple t) throws ExecException{ + if(!processingDone) { + for(Result input = inputs.get(0).getNext(dummyTuple); input.returnStatus != POStatus.STATUS_EOP; input = inputs.get(0).getNext(dummyTuple)) { + if(input.returnStatus == POStatus.STATUS_ERR) { + throw new ExecException("Error accumulating output at local Split operator"); + } + data.add((Tuple) input.result); + } + processingDone = true; + } + + Result res = new Result(); + res.returnStatus = POStatus.STATUS_OK; + res.result = data.iterator(); + return res; + } + + @Override + public void visit(PhyPlanVisitor v) throws VisitorException { + v.visitSplit(this); + } + + @Override + public String name() { + return "Split - " + mKey.toString(); + } + + @Override + public boolean supportsMultipleInputs() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean supportsMultipleOutputs() { + // TODO Auto-generated method stub + return true; + } + +} Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java?rev=699506&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java Fri Sep 26 14:25:02 2008 @@ -0,0 +1,114 @@ +package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators; + +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.pen.util.ExampleTuple; + +public class POSplitOutput extends PhysicalOperator { + + /** + * POSplitOutput reads from POSplit using an iterator + */ + private static final long serialVersionUID = 1L; + + PhysicalOperator compOp; + PhysicalPlan compPlan; + Iterator<Tuple> it; + + public POSplitOutput(OperatorKey k, int rp, List<PhysicalOperator> inp) { + super(k, rp, inp); + // TODO Auto-generated constructor stub + } + + public POSplitOutput(OperatorKey k, int rp) { + super(k, rp); + // TODO Auto-generated constructor stub + } + + public POSplitOutput(OperatorKey k, List<PhysicalOperator> inp) { + super(k, inp); + // TODO Auto-generated constructor stub + } + + public POSplitOutput(OperatorKey k) { + super(k); + // TODO Auto-generated constructor stub + } + + @Override + public void visit(PhyPlanVisitor v) throws VisitorException { + // TODO Auto-generated method stub + + } + + public Result getNext(Tuple t) throws ExecException { + if(it == null) { + PhysicalOperator op = getInputs().get(0); + Result res = getInputs().get(0).getNext(t); + if(res.returnStatus == POStatus.STATUS_OK) + it = (Iterator<Tuple>) res.result; + } + Result res = null; + Result inp = new Result(); + while(true) { + if(it.hasNext()) + inp.result = it.next(); + else { + inp.returnStatus = POStatus.STATUS_EOP; + return inp; + } + inp.returnStatus = POStatus.STATUS_OK; + + compPlan.attachInput((Tuple) inp.result); + + res = compOp.getNext(dummyBool); + if (res.returnStatus != POStatus.STATUS_OK + && res.returnStatus != POStatus.STATUS_NULL) + return res; + + if (res.result != null && (Boolean) res.result == true) { + if(lineageTracer != null) { + ExampleTuple tIn = (ExampleTuple) inp.result; + lineageTracer.insert(tIn); + lineageTracer.union(tIn, tIn); + } + return inp; + } + } + + } + + @Override + public String name() { + // TODO Auto-generated method stub + return "POSplitOutput " + mKey.toString(); + } + + @Override + public boolean supportsMultipleInputs() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean supportsMultipleOutputs() { + // TODO Auto-generated method stub + return false; + } + + public void setPlan(PhysicalPlan compPlan) { + this.compPlan = compPlan; + this.compOp = compPlan.getLeaves().get(0); + } + +}