Author: gates
Date: Fri Oct 16 19:20:31 2009
New Revision: 826047

URL: http://svn.apache.org/viewvc?rev=826047&view=rev
Log:
PIG-858: Order By followed by "replicated" join fails while compiling MR-plan 
from physical plan.


Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=826047&r1=826046&r2=826047&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 16 19:20:31 2009
@@ -57,6 +57,9 @@
 
 BUG FIXES
 
+PIG-858: Order By followed by "replicated" join fails while compiling MR-plan
+from physical plan (ashutoshc via gates)
+
 PIG-968: Fix findContainingJar to work properly when there is a + in the jar
          path (tlipcon via gates).
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=826047&r1=826046&r2=826047&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Oct 16 19:20:31 2009
@@ -55,6 +55,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -167,6 +168,8 @@
     
     private CompilationMessageCollector messageCollector = null;
     
+    private Map<PhysicalOperator,MapReduceOper> phyToMROpMap;
+    
     public static String USER_COMPARATOR_MARKER = "user.comparator.func:";
     
     public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
@@ -193,6 +196,7 @@
         scope = roots.get(0).getOperatorKey().getScope();
         messageCollector = new CompilationMessageCollector() ;
         storeToMapReduceMap = new HashMap<POStore, MapReduceOper>();
+        phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
     }
     
     public void randomizeFileLocalizer(){
@@ -325,6 +329,7 @@
 
                 plan.disconnect(op, p);
                 MRPlan.connect(oper, curMROp);
+                phyToMROpMap.put(op, curMROp);
                 return;
             }
             
@@ -351,6 +356,7 @@
                     
curMROp.UDFs.add(((POLoad)op).getLFile().getFuncSpec().toString());
             }
             MRPlan.add(curMROp);
+            phyToMROpMap.put(op, curMROp);
             return;
         }
         
@@ -690,6 +696,7 @@
      * @param op - The split operator
      * @throws VisitorException
      */
+    @Override
     public void visitSplit(POSplit op) throws VisitorException{
         try{
             FileSpec fSpec = op.getSplitStore();
@@ -697,6 +704,7 @@
             mro.setSplitter(true);
             splitsSeen.put(op.getOperatorKey(), mro);
             curMROp = startNew(fSpec, mro);
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -704,9 +712,11 @@
         }
     }
     
+    @Override
     public void visitLoad(POLoad op) throws VisitorException{
         try{
             nonBlocking(op);
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -714,10 +724,12 @@
         }
     }
     
+    @Override
     public void visitStore(POStore op) throws VisitorException{
         try{
             storeToMapReduceMap.put(op, curMROp);
             nonBlocking(op);
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -725,10 +737,12 @@
         }
     }
     
+    @Override
     public void visitFilter(POFilter op) throws VisitorException{
         try{
             nonBlocking(op);
             addUDFs(op.getPlan());
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -736,9 +750,11 @@
         }
     }
     
+    @Override
     public void visitStream(POStream op) throws VisitorException{
         try{
             nonBlocking(op);
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -829,6 +845,7 @@
         return fe;
     }
     
+    @Override
     public void visitLimit(POLimit op) throws VisitorException{
         try{
                
@@ -860,6 +877,7 @@
                messageCollector.collect("Both map and reduce phases have been 
done. This is unexpected while compiling!",
                                MessageType.Warning, 
PigWarning.UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED);
             }
+            phyToMROpMap.put(op, mro);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -867,6 +885,7 @@
         }
     }
 
+    @Override
     public void visitLocalRearrange(POLocalRearrange op) throws 
VisitorException {
         try{
             nonBlocking(op);
@@ -874,6 +893,7 @@
             if(plans!=null)
                 for(PhysicalPlan ep : plans)
                     addUDFs(ep);
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -881,6 +901,7 @@
         }
     }
     
+    @Override
     public void visitPOForEach(POForEach op) throws VisitorException{
         try{
             nonBlocking(op);
@@ -889,6 +910,7 @@
                 for (PhysicalPlan plan : plans) {
                     addUDFs(plan);
                 }
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -896,9 +918,11 @@
         }
     }
     
+    @Override
     public void visitGlobalRearrange(POGlobalRearrange op) throws 
VisitorException{
         try{
             blocking(op);
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -906,9 +930,11 @@
         }
     }
     
+    @Override
     public void visitPackage(POPackage op) throws VisitorException{
         try{
             nonBlocking(op);
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -916,9 +942,11 @@
         }
     }
     
+    @Override
     public void visitUnion(POUnion op) throws VisitorException{
         try{
             nonBlocking(op);
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -943,22 +971,14 @@
             }
             op.setReplFiles(replFiles);
             
-            List<OperatorKey> opKeys = new 
ArrayList<OperatorKey>(op.getInputs().size());
-            for (PhysicalOperator pop : op.getInputs()) {
-                opKeys.add(pop.getOperatorKey());
-            }
-            int fragPlan = 0;
+
+            curMROp = phyToMROpMap.get(op.getInputs().get(op.getFragment()));
             for(int i=0;i<compiledInputs.length;i++){
                 MapReduceOper mro = compiledInputs[i];
-                OperatorKey opKey = (!mro.isMapDone()) ?  
mro.mapPlan.getLeaves().get(0).getOperatorKey()
-                                                       :  
mro.reducePlan.getLeaves().get(0).getOperatorKey();
-                if(opKeys.indexOf(opKey)==op.getFragment()){
-                    curMROp = mro;
-                    fragPlan = i;
+                if(curMROp.equals(mro))
                     continue;
-                }
                 POStore str = getStore();
-                str.setSFile(replFiles[opKeys.indexOf(opKey)]);
+                str.setSFile(replFiles[i]);
                 if (!mro.isMapDone()) {
                     mro.mapPlan.addAsLeaf(str);
                     mro.setMapDoneSingle(true);
@@ -966,13 +986,10 @@
                     mro.reducePlan.addAsLeaf(str);
                     mro.setReduceDone(true);
                 } else {
-                       int errCode = 2022;
+                    int errCode = 2022;
                     String msg = "Both map and reduce phases have been done. 
This is unexpected while compiling.";
                     throw new PlanException(msg, errCode, PigException.BUG);
                 }
-            }
-            for(int i=0;i<compiledInputs.length;i++){
-                if(i==fragPlan) continue;
                 MRPlan.connect(compiledInputs[i], curMROp);
             }
             
@@ -996,6 +1013,7 @@
             curMROp.setFrjoin(true);
             curMROp.setFragment(op.getFragment());
             curMROp.setReplFiles(op.getReplFiles());
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -1227,6 +1245,7 @@
 
             // We want to ensure indexing job runs prior to actual join job. 
So, connect them in order.
             MRPlan.connect(rightMROpr, curMROp);
+            phyToMROpMap.put(joinOp, curMROp);
         }
         catch(PlanException e){
             int errCode = 2034;
@@ -1298,6 +1317,7 @@
             nfe1.setResultType(DataType.BAG);
             curMROp.reducePlan.addAsLeaf(nfe1);
             curMROp.setNeedsDistinctCombiner(true);
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -1305,6 +1325,7 @@
         }
     }
     
+    @Override
     public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
                try {
                        if (compiledInputs.length != 2) {
@@ -1452,6 +1473,7 @@
                        fe.visit(this);
                        
                        
curMROp.setSkewedJoinPartitionFile(partitionFile.getFileName());
+                       phyToMROpMap.put(op, curMROp);
         }catch(PlanException e) {
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
@@ -1480,6 +1502,7 @@
             if(op.isUDFComparatorUsed){
                 curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
             }
+            phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + 
op.getClass().getSimpleName();

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java?rev=826047&r1=826046&r2=826047&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java Fri Oct 16 
19:20:31 2009
@@ -146,6 +146,58 @@
         
     }
     
+    public void testSortFRJoin() throws IOException{
+      pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+      pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+      pigServer.registerQuery("D = ORDER A by y;");
+      pigServer.registerQuery("E = ORDER B by y;");
+      DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+      {
+          pigServer.registerQuery("C = join D by $0, E by $0 using 
\"replicated\";");
+          Iterator<Tuple> iter = pigServer.openIterator("C");
+          
+          while(iter.hasNext()) {
+              dbfrj.add(iter.next());
+          }
+      }
+      {
+          pigServer.registerQuery("C = join D by $0, E by $0;");
+          Iterator<Tuple> iter = pigServer.openIterator("C");
+          
+          while(iter.hasNext()) {
+              dbshj.add(iter.next());
+          }
+      }
+      Assert.assertEquals(dbfrj.size(), dbshj.size());
+      Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));        
+    }
+    
+    public void testDistinctFRJoin() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("D = distinct A ;");
+        pigServer.registerQuery("E = distinct B ;");
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = join D by $0, E by $0 using 
\"replicated\";");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = join D by $0, E by $0;");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbfrj.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));       
 
+      }
+    
     @Test
     public void testUDFFRJ() throws IOException {
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:chararray,y:int);");


Reply via email to