Author: pradeepkth
Date: Tue Sep  8 17:40:12 2009
New Revision: 812595

URL: http://svn.apache.org/viewvc?rev=812595&view=rev
Log:
join ... outer, ... outer semantics are a no-ops, should produce corresponding 
null values (pradeepkth)

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/Util.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Sep  8 17:40:12 2009
@@ -28,6 +28,9 @@
 
 IMPROVEMENTS
 
+PIG-578: join ... outer, ... outer semantics are a no-ops, should produce
+corresponding null values (pradeepkth)
+
 PIG-936: making dump and PigDump independent from Tuple.toString (daijy)
 
 PIG-890: Create a sampler interface and improve the skewed join sampler 
(sriranjan via daijy)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 Tue Sep  8 17:40:12 2009
@@ -33,7 +33,11 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -42,6 +46,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
 import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.IsEmpty;
 import org.apache.pig.impl.builtin.GFCross;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
@@ -928,39 +933,145 @@
             poPackage.setKeyType(type);
             poPackage.setResultType(DataType.TUPLE);
             poPackage.setNumInps(count);
-                
-               boolean inner[] = new boolean[count];
-               for (int i=0;i<count;i++) {
-                   inner[i] = true;
-               }
-               poPackage.setInner(inner);
-               
+            
+            boolean[] innerFlags = loj.getInnerFlags();
+            poPackage.setInner(innerFlags);
+            
                List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
                List<Boolean> flattenLst = new ArrayList<Boolean>();
-               for(int i=1;i<=count;i++){
-                   PhysicalPlan fep1 = new PhysicalPlan();
-                   POProject feproj1 = new POProject(new OperatorKey(scope, 
nodeGen.getNextNodeId(scope)), loj.getRequestedParallelism(), i);
-                   feproj1.setResultType(DataType.BAG);
-                   feproj1.setOverloaded(false);
-                   fep1.add(feproj1);
-                   fePlans.add(fep1);
-                   flattenLst.add(true);
-               }
                
-               POForEach fe = new POForEach(new OperatorKey(scope, 
nodeGen.getNextNodeId(scope)), loj.getRequestedParallelism(), fePlans, 
flattenLst );
-               currentPlan.add(fe);
                try{
+               for(int i=0;i< count;i++){
+                   PhysicalPlan fep1 = new PhysicalPlan();
+                   POProject feproj1 = new POProject(new OperatorKey(scope, 
nodeGen.getNextNodeId(scope)), 
+                           loj.getRequestedParallelism(), i+1); //i+1 since 
the first column is the "group" field
+                   feproj1.setResultType(DataType.BAG);
+                   feproj1.setOverloaded(false);
+                   fep1.add(feproj1);
+                   fePlans.add(fep1);
+                   // the parser would have marked the side
+                   // where we need to keep empty bags on
+                   // non matched as outer (innerFlags[i] would be
+                   // false)
+                   if(!(innerFlags[i])) {
+                       LogicalOperator joinInput = inputs.get(i);
+                        // for outer join add a bincond
+                        // which will project nulls when bag is
+                        // empty
+                        updateWithEmptyBagCheck(fep1, joinInput);
+                   }
+                   flattenLst.add(true);
+               }
+               
+               POForEach fe = new POForEach(new OperatorKey(scope, 
nodeGen.getNextNodeId(scope)), 
+                       loj.getRequestedParallelism(), fePlans, flattenLst );
+               currentPlan.add(fe);
                    currentPlan.connect(poPackage, fe);
+                   LogToPhyMap.put(loj, fe);
                }catch (PlanException e1) {
                    int errCode = 2015;
                    String msg = "Invalid physical operators in the physical 
plan" ;
                    throw new LogicalToPhysicalTranslatorException(msg, 
errCode, PigException.BUG, e1);
                }
-               LogToPhyMap.put(loj, fe);
+               
                }
        }
 
-       private boolean validateMergeJoin(LOJoin loj) throws 
LogicalToPhysicalTranslatorException{
+       /**
+        * updates plan with check for empty bag and if bag is empty to flatten 
a bag
+        * with as many null's as dictated by the schema
+        * @param fePlan the plan to update
+        * @param joinInput the relation for which the corresponding bag is 
being checked
+        * @throws PlanException
+        * @throws LogicalToPhysicalTranslatorException
+        */
+    public static void updateWithEmptyBagCheck(PhysicalPlan fePlan, 
LogicalOperator joinInput) throws PlanException, 
LogicalToPhysicalTranslatorException {
+        Schema inputSchema = null;
+        try {
+            inputSchema = joinInput.getSchema();
+            
+            
+            if(inputSchema == null) {
+                int errCode = 1105;
+                String msg = "Input (" + joinInput.getAlias() + ") " +
+                        "on which outer join is desired should have a valid 
schema";
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.INPUT);
+            }
+        } catch (FrontendException e) {
+            int errCode = 2014;
+            String msg = "Error while determining the schema of input";
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.BUG, e);
+        }
+        
+        // we currently have POProject[bag] as the only operator in the plan
+        // If the bag is an empty bag, we should replace
+        // it with a bag with one tuple with null fields so that when we 
flatten
+        // we do not drop records (flatten will drop records if the bag is left
+        // as an empty bag) and actually project nulls for the fields in 
+        // the empty bag
+        
+        // So we need to get to the following state:
+        // POProject[Bag]
+        //         \     
+        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)   
+        //                        \      |    POProject[Bag]             
+        //                         \     |    /
+        //                          POBinCond
+        
+        POProject relationProject = (POProject) fePlan.getRoots().get(0);
+        try {
+            
+            // condition of the bincond
+            POProject relationProjectForIsEmpty = relationProject.clone();
+            fePlan.add(relationProjectForIsEmpty);
+            String scope = relationProject.getOperatorKey().scope;
+            FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName());
+            Object f = PigContext.instantiateFuncFromSpec(isEmptySpec);
+            POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope, 
NodeIdGenerator.getGenerator().
+                        getNextNodeId(scope)), -1, null, isEmptySpec, 
(EvalFunc) f);
+            isEmpty.setResultType(DataType.BOOLEAN);
+            fePlan.add(isEmpty);
+            fePlan.connect(relationProjectForIsEmpty, isEmpty);
+            
+            // lhs of bincond (const bag with null fields)
+            ConstantExpression ce = new ConstantExpression(new 
OperatorKey(scope,
+                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+            // the following should give a tuple with the
+            // required number of nulls
+            Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size());
+            for(int i = 0; i < inputSchema.size(); i++) {
+                t.set(i, null);
+            }
+            List<Tuple> bagContents = new ArrayList<Tuple>(1);
+            bagContents.add(t);
+            DataBag bg = new NonSpillableDataBag(bagContents);
+            ce.setValue(bg);
+            ce.setResultType(DataType.BAG);
+            //this operator doesn't have any predecessors
+            fePlan.add(ce);
+            
+            //rhs of bincond is the original project
+            // let's set up the bincond now
+            POBinCond bincond = new POBinCond(new OperatorKey(scope,
+                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+            bincond.setCond(isEmpty);
+            bincond.setLhs(ce);
+            bincond.setRhs(relationProject);
+            bincond.setResultType(DataType.BAG);
+            fePlan.add(bincond);
+
+            fePlan.connect(isEmpty, bincond);
+            fePlan.connect(ce, bincond);
+            fePlan.connect(relationProject, bincond);
+
+        } catch (Exception e) {
+            throw new PlanException("Error setting up outerjoin", e);
+        }
+        
+        
+    }
+
+    private boolean validateMergeJoin(LOJoin loj) throws 
LogicalToPhysicalTranslatorException{
            
            List<LogicalOperator> preds = loj.getInputs();
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
 Tue Sep  8 17:40:12 2009
@@ -139,14 +139,12 @@
     public void visit(LOJoin join) throws VisitorException {
         String scope = join.getOperatorKey().scope;
         List<LogicalOperator> inputs = join.getInputs();
+        boolean[] innerFlags = join.getInnerFlags();
 
         // In local mode, LOJoin is achieved by POCogroup followed by a 
POForEach with flatten
         // Insert a POCogroup in the place of LOJoin
         POCogroup poc = new POCogroup(new OperatorKey(scope, 
nodeGen.getNextNodeId(scope)), join.getRequestedParallelism());
-        boolean innerArray[] = new boolean[join.getInputs().size()];
-        for (int i=0;i<join.getInputs().size();i++)
-            innerArray[i] = true;
-        poc.setInner(innerArray);
+        poc.setInner(innerFlags);
         
         currentPlan.add(poc);
         
@@ -207,6 +205,7 @@
         // Append POForEach after POCogroup
         List<Boolean> flattened = new ArrayList<Boolean>();
         List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+        
         for (int i=0;i<join.getInputs().size();i++)
         {
             PhysicalPlan ep = new PhysicalPlan();
@@ -217,6 +216,21 @@
             prj.setStar(false);
             ep.add(prj);
             eps.add(ep);
+            // the parser would have marked the side
+            // where we need to keep empty bags on
+            // non matched as outer (innerFlags[i] would be
+            // false)
+            if(!(innerFlags[i])) {
+                LogicalOperator joinInput = inputs.get(i);
+                // for outer join add a bincond
+                // which will project nulls when bag is
+                // empty
+                try {
+                    updateWithEmptyBagCheck(ep, joinInput);
+                } catch (PlanException e) {
+                    throw new VisitorException(e);
+                }
+            }
             flattened.add(true);
         }
         

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java Tue Sep  
8 17:40:12 2009
@@ -63,7 +63,7 @@
      */
     private static Log log = LogFactory.getLog(LOJoin.class);
     private MultiMap<LogicalOperator, LogicalPlan> mJoinPlans;
-
+    private boolean[] mInnerFlags;
        private JOINTYPE mJoinType; // Retains the type of the join
 
     /**
@@ -81,10 +81,20 @@
             LogicalPlan plan,
             OperatorKey k,
             MultiMap<LogicalOperator, LogicalPlan> joinPlans,
-            JOINTYPE jt) {
+            JOINTYPE jt,
+            boolean[] isInner) {
         super(plan, k);
         mJoinPlans = joinPlans;
                mJoinType = jt;
+               mInnerFlags = getCopy(isInner);
+    }
+    
+    private boolean[] getCopy(boolean[] flags) {
+        boolean[] retVal = new boolean[flags.length];
+        for (int i = 0; i < flags.length; i++) {
+            retVal[i] = flags[i];
+        }
+        return retVal;
     }
 
     public List<LogicalOperator> getInputs() {
@@ -544,4 +554,11 @@
         // shall not get here
         return null;
     }
+
+    /**
+     * @return the mInnerFlags
+     */
+    public boolean[] getInnerFlags() {
+        return getCopy(mInnerFlags);
+    }
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Tue Sep  8 17:40:12 2009
@@ -351,7 +351,7 @@
                        isInner[i] = gi.isInner;
                }
 
-               LogicalOperator loj  = new LOJoin(lp, new OperatorKey(scope, 
getNextId()), joinPlans, jt);
+               LogicalOperator loj  = new LOJoin(lp, new OperatorKey(scope, 
getNextId()), joinPlans, jt, isInner);
                lp.add(loj);
                log.debug("Added operator " + loj.getClass().getName() + " 
object " + loj + " to the logical plan " + lp);
                
@@ -863,6 +863,9 @@
 TOKEN : { <STDOUT: "stdout"> }
 TOKEN : { <LIMIT: "limit"> }
 TOKEN : { <SAMPLE: "sample"> }
+TOKEN : { <LEFT: "left"> }
+TOKEN : { <RIGHT: "right"> }
+TOKEN : { <FULL: "full"> }
 
 TOKEN:
 {
@@ -1637,6 +1640,53 @@
 
 }
 
+CogroupInput JoinItem(LogicalPlan lp) :
+{
+    LogicalOperator cgOp; 
+    boolean isInner = true;
+    ArrayList<LogicalPlan> listPlans = new ArrayList<LogicalPlan>(); 
+    LogicalPlan groupByPlan;
+    ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+    ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>();
+    log.trace("Entering JoinItem");
+    log.debug("LogicalPlan: " + lp);
+}
+{
+    (
+        cgOp = NestedExpr(lp)
+        (
+            ( <BY> 
+                ( 
+                    LOOKAHEAD ( "(" 
FlattenedGenerateItemList(cgOp.getSchema(), null, groupByPlan, cgOp) ")" )
+                    ( "(" FlattenedGenerateItem(cgOp.getSchema(), null, 
groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
+                        {listPlans.add(groupByPlan);}
+                        (
+                            "," FlattenedGenerateItem(cgOp.getSchema(), null, 
groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
+                            {listPlans.add(groupByPlan);}
+                        )*
+                        ")" 
+                    )
+                |   (
+                        FlattenedGenerateItem(cgOp.getSchema(), null, 
groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
+                        {listPlans.add(groupByPlan);}
+                    )
+                )
+            )   
+        )        
+    )
+    {
+        CogroupInput cogroupInput = new CogroupInput(); 
+
+        cogroupInput.plans = listPlans;
+        cogroupInput.op = cgOp;
+        cogroupInput.isInner = isInner;
+        
+        log.trace("Exiting GroupItem");     
+        return cogroupInput;
+    }
+}
+
+
 CogroupInput GroupItem(LogicalPlan lp) : 
 {
        ExpressionOperator es; 
@@ -1870,15 +1920,86 @@
        LogicalOperator frj = null;
        LogicalOperator skj = null;
        LogicalOperator smj = null;
+       boolean isLeftOuter = false;
+       boolean isRightOuter = false;
+       boolean isFullOuter = false;
+       boolean isOuter = false;
 }
 {
-       (gi = GroupItem(lp) { gis.add(gi); }
-       ("," gi = GroupItem(lp) { gis.add(gi); })+
+       (gi = JoinItem(lp) { gis.add(gi); }
+       [
+           (<LEFT> [<OUTER>] { isLeftOuter = true;})
+        |
+        (<RIGHT> [<OUTER>] {isRightOuter = true;})
+        |
+        (<FULL> [<OUTER>] {isFullOuter = true;})
+       ]
+       ("," gi = JoinItem(lp) { gis.add(gi); })+
+       
+       {
+               // in the case of outer joins, only two
+               // inputs are allowed
+               isOuter = (isLeftOuter || isRightOuter || isFullOuter);
+               if(isOuter && gis.size() > 2) {
+                 throw new ParseException("(left|right|full) outer joins are 
only supported for two inputs");  
+               }
+               
+               // we have exactly two inputs
+               
+               // the semantics of "outer"
+        // for join are different from cogroup
+        // cogroup a by $0 inner, b by $0 outer means keep
+        // all keys from a and for cases where there is no match
+        // from b have an empty bag for b. For keys in b which
+        // do not match in a, there will be no output records.
+        // Whereas with join,
+        // join a by $0 inner, b by $0 outer implies right outer
+        // join which has the exact opposite semantics - for
+        // all keys in b which do not have a match in b we need to
+        // output null for fields in a. For keys in a which do not
+        // match in b, no record should be output. Since we will be
+        // using the same underlying implementation for outer join
+        // as cogroup we should achieve join semantics by setting the
+        // isinner flag accordingly
+        if (isLeftOuter) {
+            gis.get(1).isInner = false;
+        } else if (isRightOuter) {
+            gis.get(0).isInner = false;
+        } else if (isFullOuter) {
+            gis.get(0).isInner = false;
+            gis.get(1).isInner = false;
+        }
+               
+       }
        // For all types of join we create LOJoin and mark what type of join it 
is.
-       ([<USING> ("\"replicated\"" { frj = parseJoin(gis, lp, 
LOJoin.JOINTYPE.REPLICATED); }
-       | "\"repl\"" { frj=parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED);}
-    |"\"skewed\"" { skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); }
-    |"\"merge\"" { smj = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE); })] ))
+       (
+               [<USING> ("\"replicated\"" { 
+                       if(isOuter) {
+                           throw new ParseException("Replicated join does not 
support (left|right|full) outer joins");
+                       }
+                                   frj = parseJoin(gis, lp, 
LOJoin.JOINTYPE.REPLICATED); 
+                           }
+                       | "\"repl\"" { 
+                                   if(isOuter) {
+                        throw new ParseException("Replicated join does not 
support (left|right|full) outer joins");
+                    }
+                                   frj=parseJoin(gis, lp, 
LOJoin.JOINTYPE.REPLICATED);
+                               }
+                   |"\"skewed\"" { 
+                           if(isOuter) {
+                        throw new ParseException("Skewed join does not support 
(left|right|full) outer joins");
+                    }
+                           skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); 
+                       }
+                   |"\"merge\"" { 
+                           if(isOuter) {
+                        throw new ParseException("Merge join does not support 
(left|right|full) outer joins");
+                    }
+                           smj = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE); 
+                       })
+           ] 
+    )
+    )
 
        {log.trace("Exiting JoinClause");
        if (frj!=null) {

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=812595&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java Tue Sep  8 17:40:12 
2009
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.test.utils.Identity;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * Test cases to test join statement
+ */
+public class TestJoin extends TestCase {
+    
+    MiniCluster cluster;
+    private PigServer pigServer;
+
+    TupleFactory mTf = TupleFactory.getInstance();
+    BagFactory mBf = BagFactory.getInstance();
+    
+    @Before
+    @Override
+    public void setUp() throws Exception{
+        FileLocalizer.setR(new Random());
+        cluster =  MiniCluster.buildCluster();
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+    
+    
+    @Test
+    public void testDefaultJoin() throws IOException, ParseException {
+        String[] input1 = {
+                "hello\t1",
+                "bye\t2",
+                "\t3"
+        };
+        String[] input2 = {
+                "hello\tworld",
+                "good\tmorning",
+                "\tevening"
+        };
+        
+        Util.createInputFile(cluster, "a.txt", input1);
+        Util.createInputFile(cluster, "b.txt", input2);
+        Tuple expectedResult = 
(Tuple)Util.getPigConstant("('hello',1,'hello','world')");
+        
+        // with schema
+        String script = "a = load 'a.txt' as (n:chararray, a:int); " +
+                       "b = load 'b.txt' as (n:chararray, m:chararray); " +
+                       "c = join a by $0, b by $0;";
+        Util.registerMultiLineQuery(pigServer, script);
+        Iterator<Tuple> it = pigServer.openIterator("c");
+        assertEquals(true, it.hasNext());
+        assertEquals(expectedResult, it.next());
+        assertEquals(false, it.hasNext());
+        
+        // without schema
+        script = "a = load 'a.txt'; " +
+        "b = load 'b.txt'; " +
+        "c = join a by $0, b by $0;";
+        Util.registerMultiLineQuery(pigServer, script);
+        it = pigServer.openIterator("c");
+        assertEquals(true, it.hasNext());
+        assertEquals(expectedResult.toString(), it.next().toString());
+        assertEquals(false, it.hasNext());
+        Util.deleteFile(cluster, "a.txt");
+        Util.deleteFile(cluster, "b.txt");
+    }
+    
+    @Test
+    public void testLeftOuterJoin() throws IOException, ParseException {
+        String[] input1 = {
+                "hello\t1",
+                "bye\t2",
+                "\t3"
+        };
+        String[] input2 = {
+                "hello\tworld",
+                "good\tmorning",
+                "\tevening"
+
+        };
+        
+        Util.createInputFile(cluster, "a.txt", input1);
+        Util.createInputFile(cluster, "b.txt", input2);
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "('hello',1,'hello','world')",
+                        "('bye',2,null,null)",
+                        "(null,3,null,null)"
+                });
+        
+        // with and without optional outer
+        for(int i = 0; i < 2; i++) {
+            //with schema
+            String script = "a = load 'a.txt' as (n:chararray, a:int); " +
+                    "b = load 'b.txt' as (n:chararray, m:chararray); ";
+            if(i == 0) {
+                script +=  "c = join a by $0 left outer, b by $0;" ;
+            } else {
+                script +=  "c = join a by $0 left, b by $0;" ;
+            }
+            script += "d = order c by $1;";
+            // ensure we parse correctly
+            LogicalPlanTester lpt = new LogicalPlanTester();
+            lpt.buildPlan(script);
+            
+            // run query and test results only once
+            if(i == 0) {
+                Util.registerMultiLineQuery(pigServer, script);
+                Iterator<Tuple> it = pigServer.openIterator("d");
+                int counter= 0;
+                while(it.hasNext()) {
+                    assertEquals(expectedResults.get(counter++), it.next());
+                }
+                assertEquals(expectedResults.size(), counter);
+                
+                // without schema
+                script = "a = load 'a.txt'; " +
+                "b = load 'b.txt'; ";
+                if(i == 0) {
+                    script +=  "c = join a by $0 left outer, b by $0;" ;
+                } else {
+                    script +=  "c = join a by $0 left, b by $0;" ;
+                }
+                try {
+                    Util.registerMultiLineQuery(pigServer, script);
+                } catch (Exception e) {
+                    PigException pe = LogUtils.getPigException(e);
+                    assertEquals(1105, pe.getErrorCode());
+                }
+            }
+        }
+        Util.deleteFile(cluster, "a.txt");
+        Util.deleteFile(cluster, "b.txt");
+    }
+
+    @Test
+    public void testRightOuterJoin() throws IOException, ParseException {
+        String[] input1 = {
+                "hello\t1",
+                "bye\t2",
+                "\t3"
+        };
+        String[] input2 = {
+                "hello\tworld",
+                "good\tmorning",
+                "\tevening"
+
+        };
+        
+        Util.createInputFile(cluster, "a.txt", input1);
+        Util.createInputFile(cluster, "b.txt", input2);
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                        "(null,null,null,'evening')",
+                        "(null,null,'good','morning')",
+                        "('hello',1,'hello','world')"
+                                       });
+        // with and without optional outer
+        for(int i = 0; i < 2; i++) {
+            // with schema
+            String script = "a = load 'a.txt' as (n:chararray, a:int); " +
+                    "b = load 'b.txt' as (n:chararray, m:chararray); ";
+            if(i == 0) {
+                script +=  "c = join a by $0 right outer, b by $0;" ;
+            } else {
+                script +=  "c = join a by $0 right, b by $0;" ;
+            }
+            script += "d = order c by $3;";
+            // ensure we parse correctly
+            LogicalPlanTester lpt = new LogicalPlanTester();
+            lpt.buildPlan(script);
+            
+            // run query and test results only once
+            if(i == 0) {
+                Util.registerMultiLineQuery(pigServer, script);
+                Iterator<Tuple> it = pigServer.openIterator("d");
+                int counter= 0;
+                while(it.hasNext()) {
+                    assertEquals(expectedResults.get(counter++), it.next());
+                }
+                assertEquals(expectedResults.size(), counter);
+                
+                // without schema
+                script = "a = load 'a.txt'; " +
+                "b = load 'b.txt'; " ;
+                if(i == 0) {
+                    script +=  "c = join a by $0 right outer, b by $0;" ;
+                } else {
+                    script +=  "c = join a by $0 right, b by $0;" ;
+                }
+                try {
+                    Util.registerMultiLineQuery(pigServer, script);
+                } catch (Exception e) {
+                    PigException pe = LogUtils.getPigException(e);
+                    assertEquals(1105, pe.getErrorCode());
+                }
+            }
+        }
+        Util.deleteFile(cluster, "a.txt");
+        Util.deleteFile(cluster, "b.txt");
+    }
+    
+    @Test
+    public void testFullOuterJoin() throws IOException, ParseException {
+        String[] input1 = {
+                "hello\t1",
+                "bye\t2",
+                "\t3"
+        };
+        String[] input2 = {
+                "hello\tworld",
+                "good\tmorning",
+                "\tevening"
+
+        };
+        
+        Util.createInputFile(cluster, "a.txt", input1);
+        Util.createInputFile(cluster, "b.txt", input2);
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                        "(null,null,null,'evening')" ,
+                        "(null,null,'good','morning')" ,
+                        "('hello',1,'hello','world')" ,
+                        "('bye',2,null,null)" ,
+                        "(null,3,null,null)"
+                                       });
+        // with and without optional outer
+        for(int i = 0; i < 2; i++) {
+            // with schema
+            String script = "a = load 'a.txt' as (n:chararray, a:int); " +
+                    "b = load 'b.txt' as (n:chararray, m:chararray); ";
+            if(i == 0) {
+                script +=  "c = join a by $0 full outer, b by $0;" ;
+            } else {
+                script +=  "c = join a by $0 full, b by $0;" ;
+            }
+            script += "d = order c by $1, $3;";
+            // ensure we parse correctly
+            LogicalPlanTester lpt = new LogicalPlanTester();
+            lpt.buildPlan(script);
+            
+            // run query and test results only once
+            if(i == 0) {
+                Util.registerMultiLineQuery(pigServer, script);
+                Iterator<Tuple> it = pigServer.openIterator("d");
+                int counter= 0;
+                while(it.hasNext()) {
+                    assertEquals(expectedResults.get(counter++), it.next());
+                }
+                assertEquals(expectedResults.size(), counter);
+                
+                // without schema
+                script = "a = load 'a.txt'; " +
+                "b = load 'b.txt'; " ;
+                if(i == 0) {
+                    script +=  "c = join a by $0 full outer, b by $0;" ;
+                } else {
+                    script +=  "c = join a by $0 full, b by $0;" ;
+                }
+                try {
+                    Util.registerMultiLineQuery(pigServer, script);
+                } catch (Exception e) {
+                    PigException pe = LogUtils.getPigException(e);
+                    assertEquals(1105, pe.getErrorCode());
+                }
+            }
+        }
+        Util.deleteFile(cluster, "a.txt");
+        Util.deleteFile(cluster, "b.txt");
+    }
+    
+    @Test
+    public void testMultiOuterJoinFailure() {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'a.txt' as (n:chararray, a:int); ");
+        lpt.buildPlan("b = load 'b.txt' as (n:chararray, m:chararray); ");
+        lpt.buildPlan("c = load 'c.txt' as (n:chararray, m:chararray); ");
+        String[] types = new String[] { "left", "right", "full" };
+        for (int i = 0; i < types.length; i++) {
+            boolean errCaught = false;
+            try {
+                lpt.buildPlanThrowExceptionOnError("d = join a by $0 " + 
types[i] + " outer, b by $0, c by $0;") ;
+                
+            } catch(Exception e) {
+                errCaught = true;
+                assertEquals("(left|right|full) outer joins are only supported 
for two inputs", e.getMessage());
+            }
+            assertEquals(true, errCaught);
+            
+        }
+        
+    }
+    
+    @Test
+    public void testNonRegularOuterJoinFailure() {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'a.txt' as (n:chararray, a:int); ");
+        lpt.buildPlan("b = load 'b.txt' as (n:chararray, m:chararray); ");
+        String[] types = new String[] { "left", "right", "full" };
+        String[] joinTypes = new String[] { "replicated", "repl", "skewed", 
"merge" };
+        for (int i = 0; i < types.length; i++) {
+            for(int j = 0; j < joinTypes.length; j++) {
+                boolean errCaught = false;
+                try {
+                    lpt.buildPlanThrowExceptionOnError(
+                            "d = join a by $0 " + types[i] + " outer, b by $0 
using \"" + joinTypes[j] +"\" ;") ;
+                    
+                } catch(Exception e) {
+                    errCaught = true;
+                    assertEquals(true, e.getMessage().contains("does not 
support (left|right|full) outer joins"));
+                }
+                assertEquals(true, errCaught);
+            }
+            
+        }
+        
+    }
+
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Tue Sep  8 17:40:12 2009
@@ -363,11 +363,19 @@
         return schema;
     }
     
-    static Object getPigConstant(String pigConstantAsString) throws 
ParseException {
+    public static Object getPigConstant(String pigConstantAsString) throws 
ParseException {
         ByteArrayInputStream stream = new 
ByteArrayInputStream(pigConstantAsString.getBytes()) ;
         QueryParser queryParser = new QueryParser(stream) ;
         return queryParser.Datum();
     }
+    
+    public static List<Tuple> getTuplesFromConstantTupleStrings(String[] 
tupleConstants) throws ParseException {
+        List<Tuple> result = new ArrayList<Tuple>(tupleConstants.length);
+        for(int i = 0; i < tupleConstants.length; i++) {
+            result.add((Tuple) getPigConstant(tupleConstants[i]));
+        }
+        return result;
+    }
 
     public static File createFile(String[] data) throws Exception{
         File f = File.createTempFile("tmp", "");

Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java Tue 
Sep  8 17:40:12 2009
@@ -23,6 +23,7 @@
 import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
 import org.apache.pig.impl.logicalLayer.optimizer.SchemaCalculator;
 import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.PlanValidationException;
@@ -82,6 +83,10 @@
     public LogicalPlan buildPlan(String query) {
         return buildPlan(query, LogicalPlanBuilder.class.getClassLoader());
     }
+    
+    public LogicalPlan buildPlanThrowExceptionOnError(String query) throws 
Exception {
+        return buildPlanThrowExceptionOnError(query, 
LogicalPlanBuilder.class.getClassLoader());
+    }
 
 
     /***
@@ -207,30 +212,8 @@
         LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext);
 
         try {
-            LogicalPlan lp = builder.parse(SCOPE,
-                                           query,
-                                           aliases,
-                                           logicalOpTable,
-                                           aliasOp,
-                                           fileNameMap);
-
-            List<LogicalOperator> roots = lp.getRoots();
-
-            if(roots.size() > 0) {
-                if (logicalOpTable.get(roots.get(0)) instanceof 
LogicalOperator){
-                    System.out.println(query);
-                    System.out.println(logicalOpTable.get(roots.get(0)));
-                }
-                if ((roots.get(0)).getAlias()!=null){
-                    aliases.put(roots.get(0), lp);
-                }
-            }
-
-            assertTrue(lp != null);
-
-            return lp ;
-        }
-        catch (IOException e) {
+            return parse(query, builder);
+        }  catch (IOException e) {
             fail("IOException: " + e.getMessage());
         }
         catch (Exception e) {
@@ -239,7 +222,47 @@
         }
         return null;
     }
+    
+    private LogicalPlan parse(String query, LogicalPlanBuilder builder) throws 
IOException, ParseException {
+        LogicalPlan lp = builder.parse(SCOPE,
+                query,
+                aliases,
+                logicalOpTable,
+                aliasOp,
+                fileNameMap);
+
+        List<LogicalOperator> roots = lp.getRoots();
+        
+        if(roots.size() > 0) {
+        if (logicalOpTable.get(roots.get(0)) instanceof LogicalOperator){
+        System.out.println(query);
+        System.out.println(logicalOpTable.get(roots.get(0)));
+        }
+        if ((roots.get(0)).getAlias()!=null){
+        aliases.put(roots.get(0), lp);
+        }
+        }
+        
+        assertTrue(lp != null);
+        
+        return lp ;
+
+    }
 
+    private LogicalPlan buildPlanThrowExceptionOnError (String query, 
ClassLoader cldr) throws IOException, ParseException {
+
+        LogicalPlanBuilder.classloader = 
LogicalPlanTester.class.getClassLoader() ;
+        PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new 
Properties());
+        try {
+            pigContext.connect();
+        } catch (ExecException e1) {
+            fail(e1.getClass().getName() + ": " + e1.getMessage() + " -- " + 
query);
+        }
+        LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext);
+
+        return parse(query, builder);
+    }
+    
     public void setPlan(LogicalPlan lp) throws VisitorException {
         PlanSetter ps = new PlanSetter(lp);
         ps.visit();


Reply via email to