Author: pradeepkth
Date: Tue Nov  3 23:27:41 2009
New Revision: 832599

URL: http://svn.apache.org/viewvc?rev=832599&view=rev
Log:
PIG-1036: Fragment-replicate left outer join (ankit.modi via pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Nov  3 23:27:41 2009
@@ -26,6 +26,8 @@
 
 IMPROVEMENTS
 
+PIG-1036: Fragment-replicate left outer join (ankit.modi via pradeepkth)
+
 PIG-920:  optimizing diamond queries (rding via pradeepkth)
 
 PIG-1040: FINDBUGS: MS_SHOULD_BE_FINAL: Field isn't final but should be  
(olgan)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 Tue Nov  3 23:27:41 2009
@@ -880,15 +880,49 @@
                                }
                        }
                        logToPhyMap.put(loj, skj);
-               } 
-               
+               }
                else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
                
                int fragment = 0;
                POFRJoin pfrj;
                try {
+                   boolean []innerFlags = loj.getInnerFlags();
+                   boolean isLeftOuter = false;
+                   // We dont check for bounds issue as we assume that a join 
+                   // involves atleast two inputs
+                   isLeftOuter = !innerFlags[1];
+                   
+                   Tuple nullTuple = null;
+                   if( isLeftOuter ) {
+                       try {
+                           // We know that in a Left outer join its only a two 
way 
+                           // join, so we assume index of 1 for the right 
input                            
+                           Schema inputSchema = inputs.get(1).getSchema();     
                    
+                           
+                           // We check if we have a schema before the join
+                           if(inputSchema == null) {
+                               int errCode = 1109;
+                               String msg = "Input (" + 
inputs.get(1).getAlias() + ") " +
+                               "on which outer join is desired should have a 
valid schema";
+                               throw new 
LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
+                           }
+                           
+                           // Using the schema we decide the number of 
columns/fields 
+                           // in the nullTuple
+                           nullTuple = 
TupleFactory.getInstance().newTuple(inputSchema.size());
+                           for(int j = 0; j < inputSchema.size(); j++) {
+                               nullTuple.set(j, null);
+                           }
+                           
+                       } catch( FrontendException e ) {
+                           int errCode = 2104;
+                        String msg = "Error while determining the schema of 
input";
+                        throw new LogicalToPhysicalTranslatorException(msg, 
errCode, PigException.BUG, e);
+                       }
+                   }
+                   
                    pfrj = new POFRJoin(new 
OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(),
-                                               inp, ppLists, keyTypes, null, 
fragment);
+                                               inp, ppLists, keyTypes, null, 
fragment, isLeftOuter, nullTuple);
                } catch (ExecException e1) {
                    int errCode = 2058;
                    String msg = "Unable to set index on newly create 
POLocalRearrange.";
@@ -1073,13 +1107,13 @@
          
           
             if(inputSchema == null) {
-                int errCode = 1105;
+                int errCode = 1109;
                 String msg = "Input (" + joinInput.getAlias() + ") " +
                         "on which outer join is desired should have a valid 
schema";
                 throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.INPUT);
             }
         } catch (FrontendException e) {
-            int errCode = 2014;
+            int errCode = 2104;
             String msg = "Error while determining the schema of input";
             throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.BUG, e);
         }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 Tue Nov  3 23:27:41 2009
@@ -24,7 +24,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.ExecType;
@@ -39,6 +38,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -49,19 +49,18 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 
-
 /**
- * The operator models the join keys using the Local Rearrange operators which 
- * are configured with the plan specified by the user. It also sets up
- * one Hashtable per replicated input which maps the Key(k) stored as a Tuple
- * to a DataBag which holds all the values in the input having the same key(k)
- * The getNext() reads an input from its predecessor and separates them into
- * key & value. It configures a foreach operator with the databags obtained 
from
- * each Hashtable for the key and also with the value for the fragment input.
- * It then returns tuples returned by this foreach operator.
+ * The operator models the join keys using the Local Rearrange operators which
+ * are configured with the plan specified by the user. It also sets up one
+ * Hashtable per replicated input which maps the Key(k) stored as a Tuple to a
+ * DataBag which holds all the values in the input having the same key(k) The
+ * getNext() reads an input from its predecessor and separates them into key &
+ * value. It configures a foreach operator with the databags obtained from each
+ * Hashtable for the key and also with the value for the fragment input. It 
then
+ * returns tuples returned by this foreach operator.
  */
 
-//We intentionally skip type checking in backend for performance reasons
+// We intentionally skip type checking in backend for performance reasons
 @SuppressWarnings("unchecked")
 public class POFRJoin extends PhysicalOperator {
     /**
@@ -69,35 +68,48 @@
      */
     private static final long serialVersionUID = 1L;
     static private Log log = LogFactory.getLog(POFRJoin.class);
-    //The number in the input list which denotes the fragmented input
+    // The number in the input list which denotes the fragmented input
     private int fragment;
-    //There can be n inputs each being a List<PhysicalPlan>
-    //Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
+    // There can be n inputs each being a List<PhysicalPlan>
+    // Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
     private List<List<PhysicalPlan>> phyPlanLists;
-    //The key type for each Local Rearrange operator
+    // The key type for each Local Rearrange operator
     private List<List<Byte>> keyTypes;
-    //The Local Rearrange operators modeling the join key
+    // The Local Rearrange operators modeling the join key
     private POLocalRearrange[] LRs;
-    //The set of files that represent the replicated inputs
+    // The set of files that represent the replicated inputs
     private FileSpec[] replFiles;
-    //Used to configure the foreach operator
+    // Used to configure the foreach operator
     private ConstantExpression[] constExps;
-    //Used to produce the cross product of various bags
+    // Used to produce the cross product of various bags
     private POForEach fe;
-    //The array of Hashtables one per replicated input. replicates[fragment] = 
null
-    private Map<Tuple,List<Tuple>> replicates[];
-    //varaible which denotes whether we are returning tuples from the foreach 
operator
+    // The array of Hashtables one per replicated input. replicates[fragment] =
+    // null
+    // fragment is the input which is fragmented and not replicated.
+    private Map<Tuple, List<Tuple>> replicates[];
+    // varaible which denotes whether we are returning tuples from the foreach
+    // operator
     private boolean processingPlan;
-    //A dummy tuple
+    // A dummy tuple
     private Tuple dumTup = TupleFactory.getInstance().newTuple(1);
-    //An instance of tuple factory
+    // An instance of tuple factory
     private transient TupleFactory mTupleFactory;
     private transient BagFactory mBagFactory;
     private boolean setUp;
-    
-    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, 
List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes, FileSpec[] 
replFiles, int fragment) throws ExecException{
-        super(k,rp,inp);
-        
+    // A Boolean variable which denotes if this is a LeftOuter Join or an Inner
+    // Join
+    private boolean isLeftOuterJoin;
+
+    // This list contains nullTuples according to schema of various inputs 
+    private DataBag nullBag;
+
+    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
+            List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes,
+            FileSpec[] replFiles, int fragment, boolean isLeftOuter,
+            Tuple nullTuple)
+            throws ExecException {
+        super(k, rp, inp);
+
         phyPlanLists = ppLists;
         this.fragment = fragment;
         this.keyTypes = keyTypes;
@@ -109,32 +121,39 @@
         processingPlan = false;
         mTupleFactory = TupleFactory.getInstance();
         mBagFactory = BagFactory.getInstance();
+        List<Tuple> tupList = new ArrayList<Tuple>();
+        tupList.add(nullTuple);
+        nullBag = mBagFactory.newDefaultBag(tupList);
+        this.isLeftOuterJoin = isLeftOuter;
     }
-    
-    public List<List<PhysicalPlan>> getJoinPlans(){
+
+    public List<List<PhysicalPlan>> getJoinPlans() {
         return phyPlanLists;
     }
-    
-    private OperatorKey genKey(OperatorKey old){
-        return new 
OperatorKey(old.scope,NodeIdGenerator.getGenerator().getNextNodeId(old.scope));
+
+    private OperatorKey genKey(OperatorKey old) {
+        return new OperatorKey(old.scope, NodeIdGenerator.getGenerator()
+                .getNextNodeId(old.scope));
     }
-    
+
     /**
      * Configures the Local Rearrange operators & the foreach operator
+     * 
      * @param old
-     * @throws ExecException 
+     * @throws ExecException
      */
-    private void createJoinPlans(OperatorKey old) throws ExecException{
+    private void createJoinPlans(OperatorKey old) throws ExecException {
         List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
         List<Boolean> flatList = new ArrayList<Boolean>();
-        
-        int i=-1;
+
+        int i = -1;
         for (List<PhysicalPlan> ppLst : phyPlanLists) {
             ++i;
             POLocalRearrange lr = new POLocalRearrange(genKey(old));
             lr.setIndex(i);
             lr.setResultType(DataType.TUPLE);
-            lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE : 
keyTypes.get(i).get(0));
+            lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE
+                    : keyTypes.get(i).get(0));
             try {
                 lr.setPlans(ppLst);
             } catch (PlanException pe) {
@@ -142,18 +161,22 @@
                 String msg = "Problem with setting up local rearrange's 
plans.";
                 throw new ExecException(msg, errCode, PigException.BUG, pe);
             }
-            LRs[i]= lr;
+            LRs[i] = lr;
             ConstantExpression ce = new ConstantExpression(genKey(old));
-            ce.setResultType((i==fragment)?DataType.TUPLE:DataType.BAG);
+            ce.setResultType((i == fragment) ? DataType.TUPLE : DataType.BAG);
             constExps[i] = ce;
             PhysicalPlan pp = new PhysicalPlan();
             pp.add(ce);
             fePlans.add(pp);
             flatList.add(true);
         }
-        fe = new POForEach(genKey(old),-1,fePlans,flatList);
+        // The ForEach operator here is used for generating a Cross-Product
+        // It is given a set of constant expressions with
+        // Tuple,(Bag|Tuple),(...)
+        // It does a cross product on that and produces output.
+        fe = new POForEach(genKey(old), -1, fePlans, flatList);
     }
-    
+
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitFRJoin(this);
@@ -161,18 +184,17 @@
 
     @Override
     public String name() {
-        return "FRJoin[" + DataType.findTypeName(resultType) + "]" +" - " + 
mKey.toString();
+        return "FRJoin[" + DataType.findTypeName(resultType) + "]" + " - "
+                + mKey.toString();
     }
 
     @Override
     public boolean supportsMultipleInputs() {
-        // TODO Auto-generated method stub
         return true;
     }
 
     @Override
     public boolean supportsMultipleOutputs() {
-        // TODO Auto-generated method stub
         return false;
     }
 
@@ -180,34 +202,36 @@
     public Result getNext(Tuple t) throws ExecException {
         Result res = null;
         Result inp = null;
-        if(!setUp){
+        if (!setUp) {
             setUpHashMap();
             setUp = true;
         }
-        if(processingPlan){
-            //Return tuples from the for each operator
-            //Assumes that it is configured appropriately with
-            //the bags for the current key.
-            while(true) {
+        if (processingPlan) {
+            // Return tuples from the for each operator
+            // Assumes that it is configured appropriately with
+            // the bags for the current key.
+            while (true) {
                 res = fe.getNext(dummyTuple);
-                
-                if(res.returnStatus==POStatus.STATUS_OK){
+
+                if (res.returnStatus == POStatus.STATUS_OK) {
                     return res;
                 }
-                if(res.returnStatus==POStatus.STATUS_EOP){
-                    processingPlan = false;
+                if (res.returnStatus == POStatus.STATUS_EOP) {
+                    // We have completed all cross-products now its time to 
move
+                    // to next tuple of left side
+                    processingPlan = false;                    
                     break;
                 }
-                if(res.returnStatus==POStatus.STATUS_ERR) {
+                if (res.returnStatus == POStatus.STATUS_ERR) {
                     return res;
                 }
-                if(res.returnStatus==POStatus.STATUS_NULL) {
+                if (res.returnStatus == POStatus.STATUS_NULL) {
                     continue;
                 }
             }
         }
         while (true) {
-            //Process the current input
+            // Process the current input
             inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_EOP
                     || inp.returnStatus == POStatus.STATUS_ERR)
@@ -215,99 +239,128 @@
             if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
             }
-            
-            //Separate Key & Value using the fragment's LR operator
+
+            // Separate Key & Value using the fragment's LR operator
             POLocalRearrange lr = LRs[fragment];
-            lr.attachInput((Tuple)inp.result);
+            lr.attachInput((Tuple) inp.result);
             Result lrOut = lr.getNext(dummyTuple);
-            if(lrOut.returnStatus!=POStatus.STATUS_OK) {
-                log.error("LocalRearrange isn't configured right or is not 
working");
+            if (lrOut.returnStatus != POStatus.STATUS_OK) {
+                log
+                        .error("LocalRearrange isn't configured right or is 
not working");
                 return new Result();
             }
             Tuple lrOutTuple = (Tuple) lrOut.result;
             Tuple key = TupleFactory.getInstance().newTuple(1);
-            key.set(0,lrOutTuple.get(1));
+            key.set(0, lrOutTuple.get(1));
             Tuple value = getValueTuple(lr, lrOutTuple);
-            
-            //Configure the for each operator with the relevant bags
-            int i=-1;
+
+            // Configure the for each operator with the relevant bags
+            int i = -1;
             boolean noMatch = false;
             for (ConstantExpression ce : constExps) {
                 ++i;
-                if(i==fragment){
+                if (i == fragment) {
+                    // We set the first CE as the tuple from fragmented Left
                     ce.setValue(value);
                     continue;
                 }
                 Map<Tuple, List<Tuple>> replicate = replicates[i];
-                if(!replicate.containsKey(key)){
+                if (!replicate.containsKey(key)) {
+                    if (isLeftOuterJoin) {
+                        ce.setValue(nullBag);
+                    }
                     noMatch = true;
                     break;
                 }
                 ce.setValue(mBagFactory.newDefaultBag(replicate.get(key)));
             }
-            if(noMatch)
+
+            // If this is not LeftOuter Join and there was no match we
+            // skip the processing of this left tuple and move ahead
+            if (!isLeftOuterJoin && noMatch)
                 continue;
             fe.attachInput(dumTup);
             processingPlan = true;
-            
+
+            // We are all set, we call getNext (this function) which will call
+            // getNext on ForEach
+            // And that will return one tuple of Cross-Product between set
+            // constant Expressions
+            // All subsequent calls ( by parent ) to this function will return
+            // next tuple of crossproduct
             Result gn = getNext(dummyTuple);
+
             return gn;
         }
     }
 
     /**
-     * Builds the HashMaps by reading each replicated input from the DFS
-     * using a Load operator
+     * Builds the HashMaps by reading each replicated input from the DFS using 
a
+     * Load operator
+     * 
      * @throws ExecException
      */
     private void setUpHashMap() throws ExecException {
-        int i=-1;
+        int i = -1;
         long time1 = System.currentTimeMillis();
         for (FileSpec replFile : replFiles) {
             ++i;
-            if(i==fragment){
+
+            if (i == fragment) {
                 replicates[i] = null;
                 continue;
             }
 
-            POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L), 
replFile, false);
-            PigContext pc = new 
PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
+            POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L),
+                    replFile, false);
+            PigContext pc = new PigContext(ExecType.MAPREDUCE,
+                    ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
             pc.connect();
             ld.setPc(pc);
+            // We use LocalRearrange Operator to seperate Key and Values
+            // eg. ( a, b, c ) would generate a, ( a, b, c )
+            // And we use 'a' as the key to the HashMap
+            // The rest '( a, b, c )' is added to HashMap as value
+            // We could have manually done this, but LocalRearrange does the
+            // same thing, so utilizing its functionality
             POLocalRearrange lr = LRs[i];
-            lr.setInputs(Arrays.asList((PhysicalOperator)ld));
-            Map<Tuple, List<Tuple>> replicate = new HashMap<Tuple, 
List<Tuple>>(1000);
+            lr.setInputs(Arrays.asList((PhysicalOperator) ld));
+            Map<Tuple, List<Tuple>> replicate = new HashMap<Tuple, 
List<Tuple>>(
+                    1000);
             log.debug("Completed setup. Trying to build replication hash 
table");
             int cnt = 0;
-            for(Result 
res=lr.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(dummyTuple)){
+            for (Result res = lr.getNext(dummyTuple);res.returnStatus != 
POStatus.STATUS_EOP;res = lr.getNext(dummyTuple)) {
                 ++cnt;
-                if(reporter!=null) reporter.progress();
+                if (reporter != null)
+                    reporter.progress();               
                 Tuple tuple = (Tuple) res.result;
                 Tuple key = mTupleFactory.newTuple(1);
-                key.set(0,tuple.get(1));
+                key.set(0, tuple.get(1));
                 Tuple value = getValueTuple(lr, tuple);
-                if(!replicate.containsKey(key))
+                if (!replicate.containsKey(key))
                     replicate.put(key, new ArrayList<Tuple>());
                 replicate.get(key).add(value);
             }
             replicates[i] = replicate;
 
         }
-       long time2 = System.currentTimeMillis();
-        log.debug("Hash Table built. Time taken: " + (time2-time1));
+        long time2 = System.currentTimeMillis();
+        log.debug("Hash Table built. Time taken: " + (time2 - time1));
     }
-    
-    private void readObject(ObjectInputStream is) throws IOException, 
ClassNotFoundException, ExecException{
+
+    private void readObject(ObjectInputStream is) throws IOException,
+            ClassNotFoundException, ExecException {
         is.defaultReadObject();
         mTupleFactory = TupleFactory.getInstance();
         mBagFactory = BagFactory.getInstance();
-//        setUpHashTable();
+        // setUpHashTable();
     }
-    
+
     /*
      * Extracts the value tuple from the LR operator's output tuple
      */
-    private Tuple getValueTuple(POLocalRearrange lr, Tuple tuple) throws 
ExecException {
+    private Tuple getValueTuple(POLocalRearrange lr, Tuple tuple)
+            throws ExecException {
         Tuple val = (Tuple) tuple.get(2);
         Tuple retTup = null;
         boolean isProjectStar = lr.isProjectStar();
@@ -315,18 +368,18 @@
         int keyLookupSize = keyLookup.size();
         Object key = tuple.get(1);
         boolean isKeyTuple = lr.isKeyTuple();
-        Tuple keyAsTuple = isKeyTuple ? (Tuple)tuple.get(1) : null;
-        if( keyLookupSize > 0) {
-            
+        Tuple keyAsTuple = isKeyTuple ? (Tuple) tuple.get(1) : null;
+        if (keyLookupSize > 0) {
+
             // we have some fields of the "value" in the
             // "key".
             retTup = mTupleFactory.newTuple();
             int finalValueSize = keyLookupSize + val.size();
-            int valIndex = 0; // an index for accessing elements from 
-                              // the value (val) that we have currently
-            for(int i = 0; i < finalValueSize; i++) {
+            int valIndex = 0; // an index for accessing elements from
+            // the value (val) that we have currently
+            for (int i = 0; i < finalValueSize; i++) {
                 Integer keyIndex = keyLookup.get(i);
-                if(keyIndex == null) {
+                if (keyIndex == null) {
                     // the field for this index is not in the
                     // key - so just take it from the "value"
                     // we were handed
@@ -334,7 +387,7 @@
                     valIndex++;
                 } else {
                     // the field for this index is in the key
-                    if(isKeyTuple) {
+                    if (isKeyTuple) {
                         // the key is a tuple, extract the
                         // field out of the tuple
                         retTup.append(keyAsTuple.get(keyIndex));
@@ -343,19 +396,19 @@
                     }
                 }
             }
-            
+
         } else if (isProjectStar) {
-            
+
             // the whole "value" is present in the "key"
             retTup = mTupleFactory.newTuple(keyAsTuple.getAll());
-            
+
         } else {
-            
+
             // there is no field of the "value" in the
             // "key" - so just make a copy of what we got
             // as the "value"
             retTup = mTupleFactory.newTuple(val.getAll());
-            
+
         }
         return retTup;
     }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Tue Nov  3 23:27:41 2009
@@ -2008,15 +2008,15 @@
        // For all types of join we create LOJoin and mark what type of join it 
is.
        (
                [<USING> ("\"replicated\"" { 
-                       if(isOuter) {
-                           throw new ParseException("Replicated join does not 
support (left|right|full) outer joins");
-                       }
+                 if(isFullOuter || isRightOuter) {
+                     throw new ParseException("Replicated join does not 
support (right|full) outer joins");
+                 }
                                    frj = parseJoin(gis, lp, 
LOJoin.JOINTYPE.REPLICATED); 
                            }
                        | "\"repl\"" { 
-                                   if(isOuter) {
-                        throw new ParseException("Replicated join does not 
support (left|right|full) outer joins");
-                    }
+                                   if(isFullOuter || isRightOuter) {
+                           throw new ParseException("Replicated join does not 
support (right|full) outer joins");
+                 }
                                    frj=parseJoin(gis, lp, 
LOJoin.JOINTYPE.REPLICATED);
                                }
                    |"\"skewed\"" {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java Tue Nov  3 
23:27:41 2009
@@ -22,8 +22,11 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
@@ -56,6 +59,7 @@
 
 public class TestFRJoin extends TestCase{
     private static final String INPUT_FILE = "testFrJoinInput.txt";
+    private static final String INPUT_FILE2 = "testFrJoinInput2.txt";
     private PigServer pigServer;
     private MiniCluster cluster = MiniCluster.buildCluster();
     private File tmpFile;
@@ -77,11 +81,21 @@
                 input[k++] = si + "\t" + j;
         }
         Util.createInputFile(cluster, INPUT_FILE, input);
+        
+        String[] input2 = new String[2*(LOOP_SIZE/2)];
+        k = 0;
+        for(int i = 1; i <= LOOP_SIZE/2; i++) {
+            String si = i + "";
+            for(int j=1;j<=LOOP_SIZE/2;j++)
+                input2[k++] = si + "\t" + j;
+        }
+        Util.createInputFile(cluster, INPUT_FILE2, input2);
     }
 
     @After
     public void tearDown() throws Exception {
         Util.deleteFile(cluster, INPUT_FILE);
+        Util.deleteFile(cluster, INPUT_FILE2 );
     }
     
     public static class FRJoin extends EvalFunc<DataBag>{
@@ -408,8 +422,83 @@
         Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
         Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
     }
+
+    @Test
+    public void testFRJoinOut8() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as 
(x:int,y:int);");
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        Map<String,Tuple> hashFRJoin = new HashMap<String,Tuple>();
+        Map<String,Tuple> hashJoin = new HashMap<String,Tuple>();
+        {
+            pigServer.registerQuery("C = join A by $0 left, B by $0 using 
\"replicated\";");
+            pigServer.registerQuery("D = join A by $1 left, B by $1 using 
\"replicated\";");
+            pigServer.registerQuery("E = union C,D;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            
+            while(iter.hasNext()) {
+                Tuple tuple = iter.next();
+                String Key = tuple.toDelimitedString(",");
+                hashFRJoin.put( Key, tuple);
+                dbfrj.add(tuple);
+                
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by $0 left, B by $0;");
+            pigServer.registerQuery("D = join A by $1 left, B by $1;");
+            pigServer.registerQuery("E = union C,D;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            while(iter.hasNext()) {
+                Tuple tuple = iter.next();
+                String Key = tuple.toDelimitedString(",");
+                hashJoin.put( Key, tuple);
+                dbshj.add(tuple);
+            }
+        }
+        Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+                
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+    }
     
     @Test
+    public void testFRJoinOut9() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as 
(x:int,y:int);");
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        Map<String,Tuple> hashFRJoin = new HashMap<String,Tuple>();
+        Map<String,Tuple> hashJoin = new HashMap<String,Tuple>();
+        {
+            pigServer.registerQuery("C = join A by $0 left, B by $0 using 
\"repl\";");
+            pigServer.registerQuery("D = join A by $1 left, B by $1 using 
\"repl\";");
+            pigServer.registerQuery("E = union C,D;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            
+            while(iter.hasNext()) {
+                Tuple tuple = iter.next();
+                String Key = tuple.toDelimitedString(",");
+                hashFRJoin.put( Key, tuple);
+                dbfrj.add(tuple);
+                
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by $0 left, B by $0;");
+            pigServer.registerQuery("D = join A by $1 left, B by $1;");
+            pigServer.registerQuery("E = union C,D;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            while(iter.hasNext()) {
+                Tuple tuple = iter.next();
+                String Key = tuple.toDelimitedString(",");
+                hashJoin.put( Key, tuple);
+                dbshj.add(tuple);
+            }
+        }
+        Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);        
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+    }
+        
+    @Test
     public void testFRJoinSch1() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java Tue Nov  3 23:27:41 
2009
@@ -466,9 +466,20 @@
                     
                 } catch(Exception e) {
                     errCaught = true;
-                    assertEquals(true, e.getMessage().contains("does not 
support (left|right|full) outer joins"));
+                    if( j == 0 || j == 1 ) {
+                     // This after adding support of LeftOuter Join to 
replicated Join
+                        assertEquals(true, e.getMessage().contains("does not 
support (right|full) outer joins"));   
+                    } else {
+                        assertEquals(true, e.getMessage().contains("does not 
support (left|right|full) outer joins"));
+                    }                    
+                }
+                if( i == 0 && ( j == 0 || j== 1 ) ) {
+                    // This after adding support of LeftOuter Join to 
replicated Join
+                    assertEquals(false, errCaught);
+                }
+                else {
+                    assertEquals(true, errCaught);
                 }
-                assertEquals(true, errCaught);
             }
             
         }


Reply via email to