Author: gates Date: Fri Oct 16 19:20:31 2009 New Revision: 826047 URL: http://svn.apache.org/viewvc?rev=826047&view=rev Log: PIG-858: Order By followed by "replicated" join fails while compiling MR-plan from physical plan.
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=826047&r1=826046&r2=826047&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Oct 16 19:20:31 2009 @@ -57,6 +57,9 @@ BUG FIXES +PIG-858: Order By followed by "replicated" join fails while compiling MR-plan +from physical plan (ashutoshc via gates) + PIG-968: Fix findContainingJar to work properly when there is a + in the jar path (tlipcon via gates). Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=826047&r1=826046&r2=826047&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Oct 16 19:20:31 2009 @@ -55,6 +55,7 @@ import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; @@ -167,6 +168,8 @@ private CompilationMessageCollector messageCollector = null; + private Map<PhysicalOperator,MapReduceOper> phyToMROpMap; + public static String USER_COMPARATOR_MARKER = "user.comparator.func:"; public MRCompiler(PhysicalPlan plan) throws MRCompilerException { @@ -193,6 +196,7 @@ scope = roots.get(0).getOperatorKey().getScope(); messageCollector = new CompilationMessageCollector() ; storeToMapReduceMap = new HashMap<POStore, MapReduceOper>(); + phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>(); } public void randomizeFileLocalizer(){ @@ -325,6 +329,7 @@ plan.disconnect(op, p); MRPlan.connect(oper, curMROp); + phyToMROpMap.put(op, curMROp); return; } @@ -351,6 +356,7 @@ curMROp.UDFs.add(((POLoad)op).getLFile().getFuncSpec().toString()); } MRPlan.add(curMROp); + phyToMROpMap.put(op, curMROp); return; } @@ -690,6 +696,7 @@ * @param op - The split operator * @throws VisitorException */ + @Override public void visitSplit(POSplit op) throws VisitorException{ try{ FileSpec fSpec = op.getSplitStore(); @@ -697,6 +704,7 @@ mro.setSplitter(true); splitsSeen.put(op.getOperatorKey(), mro); curMROp = startNew(fSpec, mro); + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -704,9 +712,11 @@ } } + @Override public void visitLoad(POLoad op) throws VisitorException{ try{ nonBlocking(op); + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -714,10 +724,12 @@ } } + @Override public void visitStore(POStore op) throws VisitorException{ try{ storeToMapReduceMap.put(op, curMROp); nonBlocking(op); + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -725,10 +737,12 @@ } } + @Override public void visitFilter(POFilter op) throws VisitorException{ try{ nonBlocking(op); addUDFs(op.getPlan()); + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -736,9 +750,11 @@ } } + @Override public void visitStream(POStream op) throws VisitorException{ try{ nonBlocking(op); + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -829,6 +845,7 @@ return fe; } + @Override public void visitLimit(POLimit op) throws VisitorException{ try{ @@ -860,6 +877,7 @@ messageCollector.collect("Both map and reduce phases have been done. This is unexpected while compiling!", MessageType.Warning, PigWarning.UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED); } + phyToMROpMap.put(op, mro); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -867,6 +885,7 @@ } } + @Override public void visitLocalRearrange(POLocalRearrange op) throws VisitorException { try{ nonBlocking(op); @@ -874,6 +893,7 @@ if(plans!=null) for(PhysicalPlan ep : plans) addUDFs(ep); + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -881,6 +901,7 @@ } } + @Override public void visitPOForEach(POForEach op) throws VisitorException{ try{ nonBlocking(op); @@ -889,6 +910,7 @@ for (PhysicalPlan plan : plans) { addUDFs(plan); } + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -896,9 +918,11 @@ } } + @Override public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{ try{ blocking(op); + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -906,9 +930,11 @@ } } + @Override public void visitPackage(POPackage op) throws VisitorException{ try{ nonBlocking(op); + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -916,9 +942,11 @@ } } + @Override public void visitUnion(POUnion op) throws VisitorException{ try{ nonBlocking(op); + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -943,22 +971,14 @@ } op.setReplFiles(replFiles); - List<OperatorKey> opKeys = new ArrayList<OperatorKey>(op.getInputs().size()); - for (PhysicalOperator pop : op.getInputs()) { - opKeys.add(pop.getOperatorKey()); - } - int fragPlan = 0; + + curMROp = phyToMROpMap.get(op.getInputs().get(op.getFragment())); for(int i=0;i<compiledInputs.length;i++){ MapReduceOper mro = compiledInputs[i]; - OperatorKey opKey = (!mro.isMapDone()) ? mro.mapPlan.getLeaves().get(0).getOperatorKey() - : mro.reducePlan.getLeaves().get(0).getOperatorKey(); - if(opKeys.indexOf(opKey)==op.getFragment()){ - curMROp = mro; - fragPlan = i; + if(curMROp.equals(mro)) continue; - } POStore str = getStore(); - str.setSFile(replFiles[opKeys.indexOf(opKey)]); + str.setSFile(replFiles[i]); if (!mro.isMapDone()) { mro.mapPlan.addAsLeaf(str); mro.setMapDoneSingle(true); @@ -966,13 +986,10 @@ mro.reducePlan.addAsLeaf(str); mro.setReduceDone(true); } else { - int errCode = 2022; + int errCode = 2022; String msg = "Both map and reduce phases have been done. This is unexpected while compiling."; throw new PlanException(msg, errCode, PigException.BUG); } - } - for(int i=0;i<compiledInputs.length;i++){ - if(i==fragPlan) continue; MRPlan.connect(compiledInputs[i], curMROp); } @@ -996,6 +1013,7 @@ curMROp.setFrjoin(true); curMROp.setFragment(op.getFragment()); curMROp.setReplFiles(op.getReplFiles()); + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -1227,6 +1245,7 @@ // We want to ensure indexing job runs prior to actual join job. So, connect them in order. MRPlan.connect(rightMROpr, curMROp); + phyToMROpMap.put(joinOp, curMROp); } catch(PlanException e){ int errCode = 2034; @@ -1298,6 +1317,7 @@ nfe1.setResultType(DataType.BAG); curMROp.reducePlan.addAsLeaf(nfe1); curMROp.setNeedsDistinctCombiner(true); + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -1305,6 +1325,7 @@ } } + @Override public void visitSkewedJoin(POSkewedJoin op) throws VisitorException { try { if (compiledInputs.length != 2) { @@ -1452,6 +1473,7 @@ fe.visit(this); curMROp.setSkewedJoinPartitionFile(partitionFile.getFileName()); + phyToMROpMap.put(op, curMROp); }catch(PlanException e) { int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); @@ -1480,6 +1502,7 @@ if(op.isUDFComparatorUsed){ curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString()); } + phyToMROpMap.put(op, curMROp); }catch(Exception e){ int errCode = 2034; String msg = "Error compiling operator " + op.getClass().getSimpleName(); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java?rev=826047&r1=826046&r2=826047&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java Fri Oct 16 19:20:31 2009 @@ -146,6 +146,58 @@ } + public void testSortFRJoin() throws IOException{ + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + pigServer.registerQuery("D = ORDER A by y;"); + pigServer.registerQuery("E = ORDER B by y;"); + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("C = join D by $0, E by $0 using \"replicated\";"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("C = join D by $0, E by $0;"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + } + Assert.assertEquals(dbfrj.size(), dbshj.size()); + Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + } + + public void testDistinctFRJoin() throws IOException{ + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); + pigServer.registerQuery("D = distinct A ;"); + pigServer.registerQuery("E = distinct B ;"); + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("C = join D by $0, E by $0 using \"replicated\";"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("C = join D by $0, E by $0;"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + } + Assert.assertEquals(dbfrj.size(), dbshj.size()); + Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + } + @Test public void testUDFFRJ() throws IOException { pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:chararray,y:int);");