Author: pradeepkth
Date: Mon Nov  2 20:20:05 2009
New Revision: 832086

URL: http://svn.apache.org/viewvc?rev=832086&view=rev
Log:
PIG-1035: support for skewed outer join (sriranjan via pradeepkth)

Added:
    hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Nov  2 20:20:05 2009
@@ -109,6 +109,8 @@
 
 BUG FIXES
 
+PIG-1035: support for skewed outer join (sriranjan via pradeepkth)
+
 PIG-1030: explain and dump not working with two UDFs inside inner plan of
 foreach (rding via pradeepkth)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Mon Nov  2 20:20:05 2009
@@ -91,6 +91,7 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
+import org.apache.pig.impl.util.CompilerUtils;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
@@ -1495,7 +1496,7 @@
                        pkg.setKeyType(type);
                        pkg.setResultType(DataType.TUPLE);
                        pkg.setNumInps(2);
-                       boolean[] inner = {true, true}; 
+                       boolean [] inner = op.getInnerFlags();
                        pkg.setInner(inner);            
                        pkg.visit(this);       
                        compiledInputs = new MapReduceOper[] {curMROp};
@@ -1504,23 +1505,22 @@
                        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
                        List<Boolean> flat = new ArrayList<Boolean>();
                        
-                       PhysicalPlan ep = new PhysicalPlan();
-                       POProject prj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
-                       prj.setColumn(1);
-                       prj.setOverloaded(false);
-                       prj.setResultType(DataType.BAG);
-                       ep.add(prj);
-                       eps.add(ep);
-                       flat.add(true);
-
-                       ep = new PhysicalPlan();
-                       prj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
-                       prj.setColumn(2);
-                       prj.setOverloaded(false);
-                       prj.setResultType(DataType.BAG);
-                       ep.add(prj);
-                       eps.add(ep);
-                       flat.add(true);
+                       PhysicalPlan ep;
+                       // Add corresponding POProjects
+                       for (int i=0; i < 2; i++ ) {
+                           ep = new PhysicalPlan();
+                           POProject prj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+                           prj.setColumn(i+1);
+                           prj.setOverloaded(false);
+                           prj.setResultType(DataType.BAG);
+                           ep.add(prj);
+                           eps.add(ep);
+                           if (!inner[i]) {
+                               // Add an empty bag for outer join
+                               CompilerUtils.addEmptyBagOuterJoin(ep, 
op.getSchema(i));
+                           }
+                           flat.add(true);
+                       }
 
                        POForEach fe = new POForEach(new 
OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat);
                        fe.setResultType(DataType.TUPLE);

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 Mon Nov  2 20:20:05 2009
@@ -62,6 +62,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 
 
+import org.apache.pig.impl.util.CompilerUtils;
 import org.apache.pig.impl.util.LinkedMultiMap;
 import org.apache.pig.impl.util.MultiMap;
 
@@ -834,7 +835,7 @@
                        POSkewedJoin skj;
                        try {
                                skj = new POSkewedJoin(new 
OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(),
-                                                                               
        inp);
+                                                                               
        inp, loj.getInnerFlags());
                                skj.setJoinPlans(joinPlans);
                        }
                        catch (Exception e) {
@@ -843,6 +844,30 @@
                                throw new 
LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
                        }
                        skj.setResultType(DataType.TUPLE);
+                       
+            boolean[] innerFlags = loj.getInnerFlags();
+                       for (int i=0; i < inputs.size(); i++) {
+                               LogicalOperator op = inputs.get(i);
+                               if (!innerFlags[i]) {
+                                       try {
+                                               Schema s = op.getSchema();
+                                               // if the schema cannot be 
determined
+                                               if (s == null) {
+                                                       throw new 
FrontendException();
+                                               }
+                                               skj.addSchema(s);
+                                       } catch (FrontendException e) {
+                                               int errCode = 2015;
+                                               String msg = "Couldn't set the 
schema for outer join" ;
+                                               throw new 
LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                                       }
+                               } else {
+                                   // This will never be retrieved. It just 
guarantees that the index will be valid when
+                                   // MRCompiler is trying to read the schema
+                                   skj.addSchema(null);
+                               }
+                       }
+                       
                        currentPlan.add(skj);
 
                        for (LogicalOperator op : inputs) {
@@ -1045,8 +1070,8 @@
         Schema inputSchema = null;
         try {
             inputSchema = joinInput.getSchema();
-            
-            
+         
+          
             if(inputSchema == null) {
                 int errCode = 1105;
                 String msg = "Input (" + joinInput.getAlias() + ") " +
@@ -1059,71 +1084,7 @@
             throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.BUG, e);
         }
         
-        // we currently have POProject[bag] as the only operator in the plan
-        // If the bag is an empty bag, we should replace
-        // it with a bag with one tuple with null fields so that when we 
flatten
-        // we do not drop records (flatten will drop records if the bag is left
-        // as an empty bag) and actually project nulls for the fields in 
-        // the empty bag
-        
-        // So we need to get to the following state:
-        // POProject[Bag]
-        //         \     
-        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)   
-        //                        \      |    POProject[Bag]             
-        //                         \     |    /
-        //                          POBinCond
-        
-        POProject relationProject = (POProject) fePlan.getRoots().get(0);
-        try {
-            
-            // condition of the bincond
-            POProject relationProjectForIsEmpty = relationProject.clone();
-            fePlan.add(relationProjectForIsEmpty);
-            String scope = relationProject.getOperatorKey().scope;
-            FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName());
-            Object f = PigContext.instantiateFuncFromSpec(isEmptySpec);
-            POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope, 
NodeIdGenerator.getGenerator().
-                        getNextNodeId(scope)), -1, null, isEmptySpec, 
(EvalFunc) f);
-            isEmpty.setResultType(DataType.BOOLEAN);
-            fePlan.add(isEmpty);
-            fePlan.connect(relationProjectForIsEmpty, isEmpty);
-            
-            // lhs of bincond (const bag with null fields)
-            ConstantExpression ce = new ConstantExpression(new 
OperatorKey(scope,
-                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-            // the following should give a tuple with the
-            // required number of nulls
-            Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size());
-            for(int i = 0; i < inputSchema.size(); i++) {
-                t.set(i, null);
-            }
-            List<Tuple> bagContents = new ArrayList<Tuple>(1);
-            bagContents.add(t);
-            DataBag bg = new NonSpillableDataBag(bagContents);
-            ce.setValue(bg);
-            ce.setResultType(DataType.BAG);
-            //this operator doesn't have any predecessors
-            fePlan.add(ce);
-            
-            //rhs of bincond is the original project
-            // let's set up the bincond now
-            POBinCond bincond = new POBinCond(new OperatorKey(scope,
-                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-            bincond.setCond(isEmpty);
-            bincond.setLhs(ce);
-            bincond.setRhs(relationProject);
-            bincond.setResultType(DataType.BAG);
-            fePlan.add(bincond);
-
-            fePlan.connect(isEmpty, bincond);
-            fePlan.connect(ce, bincond);
-            fePlan.connect(relationProject, bincond);
-
-        } catch (Exception e) {
-            throw new PlanException("Error setting up outerjoin", e);
-        }
-        
+        CompilerUtils.addEmptyBagOuterJoin(fePlan, inputSchema);
         
     }
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
 Mon Nov  2 20:20:05 2009
@@ -18,6 +18,7 @@
 
 package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -26,6 +27,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
@@ -42,6 +44,10 @@
 
        
        private static final long serialVersionUID = 1L;
+       private boolean[] mInnerFlags;
+       
+       // The schema is used only by the MRCompiler to support outer join
+       transient private List<Schema> inputSchema = new ArrayList<Schema>();
        
        transient private static Log log = 
LogFactory.getLog(POSkewedJoin.class);
        
@@ -51,19 +57,30 @@
     private MultiMap<PhysicalOperator, PhysicalPlan> mJoinPlans;
 
     public POSkewedJoin(OperatorKey k)  {
-        this(k,-1,null);
+        this(k,-1,null, null);
     }
 
     public POSkewedJoin(OperatorKey k, int rp) {
-        this(k, rp, null);
+        this(k, rp, null, null);
     }
 
-    public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp) {
-        this(k, -1, inp);
+    public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp, boolean 
[]flags) {
+        this(k, -1, inp, flags);
     }
 
-    public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) {
-        super(k,rp,inp);      
+    public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, 
boolean []flags) {
+        super(k,rp,inp);
+        if (flags != null) {
+               // copy the inner flags
+               mInnerFlags = new boolean[flags.length];
+               for (int i = 0; i < flags.length; i++) {
+                       mInnerFlags[i] = flags[i];
+               }
+        }
+    }
+    
+    public boolean[] getInnerFlags() {
+       return mInnerFlags;
     }
     
     public MultiMap<PhysicalOperator, PhysicalPlan> getJoinPlans() {
@@ -93,5 +110,13 @@
        public boolean supportsMultipleOutputs() {      
                return false;
        }
+       
+       public void addSchema(Schema s) {
+               inputSchema.add(s);
+       }
+       
+       public Schema getSchema(int i) {
+               return inputSchema.get(i);
+       }
 
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Mon Nov  2 20:20:05 2009
@@ -2019,10 +2019,7 @@
                     }
                                    frj=parseJoin(gis, lp, 
LOJoin.JOINTYPE.REPLICATED);
                                }
-                   |"\"skewed\"" { 
-                           if(isOuter) {
-                        throw new ParseException("Skewed join does not support 
(left|right|full) outer joins");
-                    }
+                   |"\"skewed\"" {
                            skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); 
                        }
                    |"\"merge\"" { 

Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java?rev=832086&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java Mon Nov  2 
20:20:05 2009
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.builtin.IsEmpty;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+
+/* 
+ * A class to add util functions that gets used by LogToPhyTranslator and 
MRCompiler
+ * 
+ */
+public class CompilerUtils {
+
+    public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema 
inputSchema) throws PlanException {
+        // we currently have POProject[bag] as the only operator in the plan
+        // If the bag is an empty bag, we should replace
+        // it with a bag with one tuple with null fields so that when we 
flatten
+        // we do not drop records (flatten will drop records if the bag is left
+        // as an empty bag) and actually project nulls for the fields in 
+        // the empty bag
+        
+        // So we need to get to the following state:
+        // POProject[Bag]
+        //         \     
+        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)   
+        //                        \      |    POProject[Bag]             
+        //                         \     |    /
+        //                          POBinCond
+        POProject relationProject = (POProject) fePlan.getRoots().get(0);
+        try {
+            
+            // condition of the bincond
+            POProject relationProjectForIsEmpty = relationProject.clone();
+            fePlan.add(relationProjectForIsEmpty);
+            String scope = relationProject.getOperatorKey().scope;
+            FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName());
+            Object f = PigContext.instantiateFuncFromSpec(isEmptySpec);
+            POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope, 
NodeIdGenerator.getGenerator().
+                        getNextNodeId(scope)), -1, null, isEmptySpec, 
(EvalFunc) f);
+            isEmpty.setResultType(DataType.BOOLEAN);
+            fePlan.add(isEmpty);
+            fePlan.connect(relationProjectForIsEmpty, isEmpty);
+            
+            // lhs of bincond (const bag with null fields)
+            ConstantExpression ce = new ConstantExpression(new 
OperatorKey(scope,
+                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+            // the following should give a tuple with the
+            // required number of nulls
+            Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size());
+            for(int i = 0; i < inputSchema.size(); i++) {
+                t.set(i, null);
+            }
+            List<Tuple> bagContents = new ArrayList<Tuple>(1);
+            bagContents.add(t);
+            DataBag bg = new NonSpillableDataBag(bagContents);
+            ce.setValue(bg);
+            ce.setResultType(DataType.BAG);
+            //this operator doesn't have any predecessors
+            fePlan.add(ce);
+            
+            //rhs of bincond is the original project
+            // let's set up the bincond now
+            POBinCond bincond = new POBinCond(new OperatorKey(scope,
+                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+            bincond.setCond(isEmpty);
+            bincond.setLhs(ce);
+            bincond.setRhs(relationProject);
+            bincond.setResultType(DataType.BAG);
+            fePlan.add(bincond);
+
+            fePlan.connect(isEmpty, bincond);
+            fePlan.connect(ce, bincond);
+            fePlan.connect(relationProject, bincond);
+
+        } catch (Exception e) {
+            throw new PlanException("Error setting up outerjoin", e);
+        }
+       
+    }
+
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java Mon Nov  2 20:20:05 
2009
@@ -456,7 +456,7 @@
         lpt.buildPlan("a = load 'a.txt' as (n:chararray, a:int); ");
         lpt.buildPlan("b = load 'b.txt' as (n:chararray, m:chararray); ");
         String[] types = new String[] { "left", "right", "full" };
-        String[] joinTypes = new String[] { "replicated", "repl", "skewed", 
"merge" };
+        String[] joinTypes = new String[] { "replicated", "repl", "merge" };
         for (int i = 0; i < types.length; i++) {
             for(int j = 0; j < joinTypes.length; j++) {
                 boolean errCaught = false;

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Mon Nov  2 
20:20:05 2009
@@ -324,6 +324,43 @@
         return;
     }
     
+    public void testSkewedJoinOuter() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as 
(id,name);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as 
(id,name);");
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("C = join A by id left, B by id using 
\"skewed\";");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+            {
+                pigServer.registerQuery("C = join A by id right, B by id using 
\"skewed\";");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+            {
+                pigServer.registerQuery("C = join A by id full, B by id using 
\"skewed\";");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+        } catch(Exception e) {
+            System.out.println(e.getMessage());
+            e.printStackTrace();
+            fail("Should support outer join in skewed join");
+        }
+        return;
+    }
+    
     // pig 1048
     public void testSkewedJoinOneValue() throws IOException {
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE3 + "' as 
(id,name);");


Reply via email to