Author: daijy
Date: Wed Aug 19 07:01:15 2009
New Revision: 805684

URL: http://svn.apache.org/viewvc?rev=805684&view=rev
Log:
PIG-925: Fix join in local mode

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLocal2.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=805684&r1=805683&r2=805684&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Aug 19 07:01:15 2009
@@ -54,6 +54,8 @@
 
 BUG FIXES
     
+    PIG-925: Fix join in local mode (daijy)
+
     PIG-913: Error in Pig script when grouping on chararray column (daijy)
 
     PIG-907: Provide multiple version of HashFNV (Piggybank) (daijy)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=805684&r1=805683&r2=805684&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
 Wed Aug 19 07:01:15 2009
@@ -28,7 +28,9 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
@@ -41,6 +43,7 @@
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
 import org.apache.pig.impl.logicalLayer.LOCross;
+import org.apache.pig.impl.logicalLayer.LOJoin;
 import org.apache.pig.impl.logicalLayer.LOSplit;
 import org.apache.pig.impl.logicalLayer.LOSplitOutput;
 import org.apache.pig.impl.logicalLayer.LOStore;
@@ -48,6 +51,7 @@
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.DependencyOrderWalkerWOSeenChk;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.PlanWalker;
@@ -132,6 +136,106 @@
     }
     
     @Override
+    public void visit(LOJoin join) throws VisitorException {
+        String scope = join.getOperatorKey().scope;
+        List<LogicalOperator> inputs = join.getInputs();
+
+        // In local mode, LOJoin is achieved by POCogroup followed by a 
POForEach with flatten
+        // Insert a POCogroup in the place of LOJoin
+        POCogroup poc = new POCogroup(new OperatorKey(scope, 
nodeGen.getNextNodeId(scope)), join.getRequestedParallelism());
+        boolean innerArray[] = new boolean[join.getInputs().size()];
+        for (int i=0;i<join.getInputs().size();i++)
+            innerArray[i] = true;
+        poc.setInner(innerArray);
+        
+        currentPlan.add(poc);
+        
+        // Add innner plans to POCogroup
+        int count = 0;
+        Byte type = null;
+        for(LogicalOperator lo : inputs) {
+            List<LogicalPlan> plans = (List<LogicalPlan>) 
join.getJoinPlans().get(lo);
+            
+            POLocalRearrangeForIllustrate physOp = new 
POLocalRearrangeForIllustrate(new OperatorKey(
+                    scope, nodeGen.getNextNodeId(scope)), join
+                    .getRequestedParallelism());
+            List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+            currentPlans.push(currentPlan);
+            for (LogicalPlan lp : plans) {
+                currentPlan = new PhysicalPlan();
+                PlanWalker<LogicalOperator, LogicalPlan> childWalker = 
mCurrentWalker
+                        .spawnChildWalker(lp);
+                pushWalker(childWalker);
+                mCurrentWalker.walk(this);
+                exprPlans.add(currentPlan);
+                popWalker();
+
+            }
+            currentPlan = currentPlans.pop();
+            try {
+                physOp.setPlans(exprPlans);
+            } catch (PlanException pe) {
+                throw new VisitorException(pe);
+            }
+            try {
+                physOp.setIndex(count++);
+            } catch (ExecException e1) {
+                throw new VisitorException(e1);
+            }
+            if (plans.size() > 1) {
+                type = DataType.TUPLE;
+                physOp.setKeyType(type);
+            } else {
+                type = exprPlans.get(0).getLeaves().get(0).getResultType();
+                physOp.setKeyType(type);
+            }
+            physOp.setResultType(DataType.TUPLE);
+
+            currentPlan.add(physOp);
+
+            try {
+                currentPlan.connect(LogToPhyMap.get(lo), physOp);
+                currentPlan.connect(physOp, poc);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
+                throw new VisitorException(e);
+            }
+            
+        }
+        
+        // Append POForEach after POCogroup
+        List<Boolean> flattened = new ArrayList<Boolean>();
+        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+        for (int i=0;i<join.getInputs().size();i++)
+        {
+            PhysicalPlan ep = new PhysicalPlan();
+            POProject prj = new POProject(new 
OperatorKey(scope,nodeGen.getNextNodeId(scope)));
+            prj.setResultType(DataType.BAG);
+            prj.setColumn(i+1);
+            prj.setOverloaded(false);
+            prj.setStar(false);
+            ep.add(prj);
+            eps.add(ep);
+            flattened.add(true);
+        }
+        
+        POForEach fe = new POForEach(new 
OperatorKey(scope,nodeGen.getNextNodeId(scope)),-1,eps,flattened);
+        
+        fe.setResultType(DataType.BAG);
+
+        currentPlan.add(fe);
+        LogToPhyMap.put(join, fe);
+        try {
+            currentPlan.connect(poc, fe);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.BUG, e);
+        }
+    }
+    
+    @Override
     public void visit(LOSplit split) throws VisitorException {
        String scope = split.getOperatorKey().scope;
         PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocal2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocal2.java?rev=805684&r1=805683&r2=805684&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocal2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocal2.java Wed Aug 19 
07:01:15 2009
@@ -136,6 +136,47 @@
         
     }
     
+    @Test
+    public void testJoin1() throws Exception {
+        // Regression test for Pig-925
+        File fp1 = File.createTempFile("test1", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(fp1));
+        
+        ps.println("1\t1");
+        ps.println("2\t2");
+        ps.close();
+        
+        File fp2 = File.createTempFile("test2", "txt");
+        ps = new PrintStream(new FileOutputStream(fp2));
+        
+        ps.println("1\t1");
+        ps.println("2\t2");
+        ps.close();
+        
+        
+        pig.registerQuery("A = load '" + Util.generateURI(fp1.toString()) + 
"'AS (a0:int, a1:int); ");
+        pig.registerQuery("B = load '" + Util.generateURI(fp2.toString()) + 
"'AS (b0:int, b1:int); ");
+        pig.registerQuery("C = join A by a0, B by b0;");
+        
+        Iterator<Tuple> iter = pig.openIterator("C");
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.get(0).equals(new Integer(1)));
+        assertTrue(t.get(1).equals(new Integer(1)));
+        assertTrue(t.get(2).equals(new Integer(1)));
+        assertTrue(t.get(3).equals(new Integer(1)));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.get(0).equals(new Integer(2)));
+        assertTrue(t.get(1).equals(new Integer(2)));
+        assertTrue(t.get(2).equals(new Integer(2)));
+        assertTrue(t.get(3).equals(new Integer(2)));
+        
+        assertTrue(!iter.hasNext());
+    }
+    
+    
     static public class Pig800Udf extends EvalFunc<DataBag> {
         
         @Override


Reply via email to