Author: olga Date: Wed Aug 13 16:47:34 2008 New Revision: 685730 URL: http://svn.apache.org/viewvc?rev=685730&view=rev Log: PIG-311: corss is broken
Modified: incubator/pig/branches/types/CHANGES.txt incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Modified: incubator/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=685730&r1=685729&r2=685730&view=diff ============================================================================== --- incubator/pig/branches/types/CHANGES.txt (original) +++ incubator/pig/branches/types/CHANGES.txt Wed Aug 13 16:47:34 2008 @@ -149,3 +149,5 @@ PIG-367: convinience function for UDFs to name schema PIG-368: making JobConf available to Load/Store UDFs + + PIG-311: cross is broken Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=685730&r1=685729&r2=685730&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Aug 13 16:47:34 2008 @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,7 +33,10 @@ import org.apache.pig.FuncSpec; import org.apache.pig.LoadFunc; import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*; @@ -41,6 +45,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator; import org.apache.pig.builtin.BinStorage; +import org.apache.pig.impl.builtin.GFCross; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.logicalLayer.*; @@ -503,6 +508,119 @@ } @Override + protected void visit(LOCross cs) throws VisitorException { + String scope = cs.getOperatorKey().scope; + List<LogicalOperator> inputs = cs.getInputs(); + + POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey( + scope, nodeGen.getNextNodeId(scope)), cs + .getRequestedParallelism()); + POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen + .getNextNodeId(scope)), cs.getRequestedParallelism()); + + currentPlan.add(poGlobal); + currentPlan.add(poPackage); + + int count = 0; + + try { + currentPlan.connect(poGlobal, poPackage); + List<Boolean> flattenLst = Arrays.asList(true, true); + + for (LogicalOperator op : inputs) { + List<PhysicalOperator> pop = Arrays.asList(LogToPhyMap.get(op)); + PhysicalPlan fep1 = new PhysicalPlan(); + ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cs.getRequestedParallelism()); + Tuple ce1val = TupleFactory.getInstance().newTuple(2); + ce1val.set(0,inputs.size()); + ce1val.set(1,count); + ce1.setValue(ce1val); + ce1.setResultType(DataType.TUPLE); + + fep1.add(ce1); + + POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cs.getRequestedParallelism(), Arrays.asList((PhysicalOperator)ce1), new FuncSpec(GFCross.class.getName())); + gfc.setResultType(DataType.BAG); + fep1.addAsLeaf(gfc); + + PhysicalPlan fep2 = new PhysicalPlan(); + POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism()); + feproj.setResultType(DataType.TUPLE); + feproj.setStar(true); + feproj.setOverloaded(false); + fep2.add(feproj); + List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2); + + POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), fePlans, flattenLst ); + currentPlan.add(fe); + currentPlan.connect(LogToPhyMap.get(op), fe); + + POLocalRearrange physOp = new POLocalRearrange(new OperatorKey( + scope, nodeGen.getNextNodeId(scope)), cs + .getRequestedParallelism()); + List<PhysicalPlan> lrPlans = new ArrayList<PhysicalPlan>(); + for(int i=0;i<inputs.size();i++){ + PhysicalPlan lrp1 = new PhysicalPlan(); + POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), i); + lrproj1.setOverloaded(false); + lrproj1.setResultType(DataType.INTEGER); + lrp1.add(lrproj1); + lrPlans.add(lrp1); + } + + physOp.setCross(true); + physOp.setIndex(count++); + physOp.setKeyType(DataType.TUPLE); + physOp.setPlans(lrPlans); + physOp.setResultType(DataType.TUPLE); + + currentPlan.add(physOp); + currentPlan.connect(fe, physOp); + currentPlan.connect(physOp, poGlobal); + } + } catch (PlanException e1) { + log.error("Invalid physical operators in the physical plan" + + e1.getMessage()); + throw new VisitorException(e1); + } catch (ExecException e) { + log.error("Unable to create the constant tuple because " + e.getMessage()); + throw new VisitorException(e); + } + + poPackage.setKeyType(DataType.TUPLE); + poPackage.setResultType(DataType.TUPLE); + poPackage.setNumInps(count); + boolean inner[] = new boolean[count]; + for (int i=0;i<count;i++) { + inner[i] = true; + } + poPackage.setInner(inner); + + List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>(); + List<Boolean> flattenLst = new ArrayList<Boolean>(); + for(int i=1;i<=count;i++){ + PhysicalPlan fep1 = new PhysicalPlan(); + POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), i); + feproj1.setResultType(DataType.BAG); + feproj1.setOverloaded(false); + fep1.add(feproj1); + fePlans.add(fep1); + flattenLst.add(true); + } + + POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), fePlans, flattenLst ); + currentPlan.add(fe); + try{ + currentPlan.connect(poPackage, fe); + }catch (PlanException e1) { + log.error("Invalid physical operators in the physical plan" + + e1.getMessage()); + throw new VisitorException(e1); + } + LogToPhyMap.put(cs, fe); + } + + @Override public void visit(LOCogroup cg) throws VisitorException { boolean currentPhysicalPlan = false; String scope = cg.getOperatorKey().scope; Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=685730&r1=685729&r2=685730&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Wed Aug 13 16:47:34 2008 @@ -64,6 +64,8 @@ byte keyType; private boolean mIsDistinct = false; + + private boolean isCross = false; // A place holder IndexedTuple used in distinct case where we really don't // have any value to pass through. But hadoop gets cranky if we pass a @@ -228,6 +230,10 @@ outPut.set(1, mFakeIndexedTuple); return outPut; } else { + if(isCross){ + for(int i=0;i<plans.size();i++) + value.getAll().remove(0); + } //Create the indexed tuple out of the value //that is remaining in the input tuple IndexedTuple it = new IndexedTuple(value, index); @@ -284,5 +290,13 @@ return clone; } + public boolean isCross() { + return isCross; + } + + public void setCross(boolean isCross) { + this.isCross = isCross; + } + }