Author: pradeepkth Date: Mon Nov 2 20:20:05 2009 New Revision: 832086 URL: http://svn.apache.org/viewvc?rev=832086&view=rev Log: PIG-1035: support for skewed outer join (sriranjan via pradeepkth)
Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=832086&r1=832085&r2=832086&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Mon Nov 2 20:20:05 2009 @@ -109,6 +109,8 @@ BUG FIXES +PIG-1035: support for skewed outer join (sriranjan via pradeepkth) + PIG-1030: explain and dump not working with two UDFs inside inner plan of foreach (rding via pradeepkth) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=832086&r1=832085&r2=832086&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Nov 2 20:20:05 2009 @@ -91,6 +91,7 @@ import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType; +import org.apache.pig.impl.util.CompilerUtils; import org.apache.pig.impl.util.MultiMap; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Pair; @@ -1495,7 +1496,7 @@ pkg.setKeyType(type); pkg.setResultType(DataType.TUPLE); pkg.setNumInps(2); - boolean[] inner = {true, true}; + boolean [] inner = op.getInnerFlags(); pkg.setInner(inner); pkg.visit(this); compiledInputs = new MapReduceOper[] {curMROp}; @@ -1504,23 +1505,22 @@ List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>(); List<Boolean> flat = new ArrayList<Boolean>(); - PhysicalPlan ep = new PhysicalPlan(); - POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); - prj.setColumn(1); - prj.setOverloaded(false); - prj.setResultType(DataType.BAG); - ep.add(prj); - eps.add(ep); - flat.add(true); - - ep = new PhysicalPlan(); - prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); - prj.setColumn(2); - prj.setOverloaded(false); - prj.setResultType(DataType.BAG); - ep.add(prj); - eps.add(ep); - flat.add(true); + PhysicalPlan ep; + // Add corresponding POProjects + for (int i=0; i < 2; i++ ) { + ep = new PhysicalPlan(); + POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); + prj.setColumn(i+1); + prj.setOverloaded(false); + prj.setResultType(DataType.BAG); + ep.add(prj); + eps.add(ep); + if (!inner[i]) { + // Add an empty bag for outer join + CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i)); + } + flat.add(true); + } POForEach fe = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat); fe.setResultType(DataType.TUPLE); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=832086&r1=832085&r2=832086&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Mon Nov 2 20:20:05 2009 @@ -62,6 +62,7 @@ import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.CompilerUtils; import org.apache.pig.impl.util.LinkedMultiMap; import org.apache.pig.impl.util.MultiMap; @@ -834,7 +835,7 @@ POSkewedJoin skj; try { skj = new POSkewedJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(), - inp); + inp, loj.getInnerFlags()); skj.setJoinPlans(joinPlans); } catch (Exception e) { @@ -843,6 +844,30 @@ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } skj.setResultType(DataType.TUPLE); + + boolean[] innerFlags = loj.getInnerFlags(); + for (int i=0; i < inputs.size(); i++) { + LogicalOperator op = inputs.get(i); + if (!innerFlags[i]) { + try { + Schema s = op.getSchema(); + // if the schema cannot be determined + if (s == null) { + throw new FrontendException(); + } + skj.addSchema(s); + } catch (FrontendException e) { + int errCode = 2015; + String msg = "Couldn't set the schema for outer join" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + } else { + // This will never be retrieved. It just guarantees that the index will be valid when + // MRCompiler is trying to read the schema + skj.addSchema(null); + } + } + currentPlan.add(skj); for (LogicalOperator op : inputs) { @@ -1045,8 +1070,8 @@ Schema inputSchema = null; try { inputSchema = joinInput.getSchema(); - - + + if(inputSchema == null) { int errCode = 1105; String msg = "Input (" + joinInput.getAlias() + ") " + @@ -1059,71 +1084,7 @@ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } - // we currently have POProject[bag] as the only operator in the plan - // If the bag is an empty bag, we should replace - // it with a bag with one tuple with null fields so that when we flatten - // we do not drop records (flatten will drop records if the bag is left - // as an empty bag) and actually project nulls for the fields in - // the empty bag - - // So we need to get to the following state: - // POProject[Bag] - // \ - // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields) - // \ | POProject[Bag] - // \ | / - // POBinCond - - POProject relationProject = (POProject) fePlan.getRoots().get(0); - try { - - // condition of the bincond - POProject relationProjectForIsEmpty = relationProject.clone(); - fePlan.add(relationProjectForIsEmpty); - String scope = relationProject.getOperatorKey().scope; - FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName()); - Object f = PigContext.instantiateFuncFromSpec(isEmptySpec); - POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope, NodeIdGenerator.getGenerator(). - getNextNodeId(scope)), -1, null, isEmptySpec, (EvalFunc) f); - isEmpty.setResultType(DataType.BOOLEAN); - fePlan.add(isEmpty); - fePlan.connect(relationProjectForIsEmpty, isEmpty); - - // lhs of bincond (const bag with null fields) - ConstantExpression ce = new ConstantExpression(new OperatorKey(scope, - NodeIdGenerator.getGenerator().getNextNodeId(scope))); - // the following should give a tuple with the - // required number of nulls - Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size()); - for(int i = 0; i < inputSchema.size(); i++) { - t.set(i, null); - } - List<Tuple> bagContents = new ArrayList<Tuple>(1); - bagContents.add(t); - DataBag bg = new NonSpillableDataBag(bagContents); - ce.setValue(bg); - ce.setResultType(DataType.BAG); - //this operator doesn't have any predecessors - fePlan.add(ce); - - //rhs of bincond is the original project - // let's set up the bincond now - POBinCond bincond = new POBinCond(new OperatorKey(scope, - NodeIdGenerator.getGenerator().getNextNodeId(scope))); - bincond.setCond(isEmpty); - bincond.setLhs(ce); - bincond.setRhs(relationProject); - bincond.setResultType(DataType.BAG); - fePlan.add(bincond); - - fePlan.connect(isEmpty, bincond); - fePlan.connect(ce, bincond); - fePlan.connect(relationProject, bincond); - - } catch (Exception e) { - throw new PlanException("Error setting up outerjoin", e); - } - + CompilerUtils.addEmptyBagOuterJoin(fePlan, inputSchema); } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java?rev=832086&r1=832085&r2=832086&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java Mon Nov 2 20:20:05 2009 @@ -18,6 +18,7 @@ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; +import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; @@ -26,6 +27,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.data.DataType; +import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.MultiMap; @@ -42,6 +44,10 @@ private static final long serialVersionUID = 1L; + private boolean[] mInnerFlags; + + // The schema is used only by the MRCompiler to support outer join + transient private List<Schema> inputSchema = new ArrayList<Schema>(); transient private static Log log = LogFactory.getLog(POSkewedJoin.class); @@ -51,19 +57,30 @@ private MultiMap<PhysicalOperator, PhysicalPlan> mJoinPlans; public POSkewedJoin(OperatorKey k) { - this(k,-1,null); + this(k,-1,null, null); } public POSkewedJoin(OperatorKey k, int rp) { - this(k, rp, null); + this(k, rp, null, null); } - public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp) { - this(k, -1, inp); + public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp, boolean []flags) { + this(k, -1, inp, flags); } - public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) { - super(k,rp,inp); + public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, boolean []flags) { + super(k,rp,inp); + if (flags != null) { + // copy the inner flags + mInnerFlags = new boolean[flags.length]; + for (int i = 0; i < flags.length; i++) { + mInnerFlags[i] = flags[i]; + } + } + } + + public boolean[] getInnerFlags() { + return mInnerFlags; } public MultiMap<PhysicalOperator, PhysicalPlan> getJoinPlans() { @@ -93,5 +110,13 @@ public boolean supportsMultipleOutputs() { return false; } + + public void addSchema(Schema s) { + inputSchema.add(s); + } + + public Schema getSchema(int i) { + return inputSchema.get(i); + } } Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=832086&r1=832085&r2=832086&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Nov 2 20:20:05 2009 @@ -2019,10 +2019,7 @@ } frj=parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED); } - |"\"skewed\"" { - if(isOuter) { - throw new ParseException("Skewed join does not support (left|right|full) outer joins"); - } + |"\"skewed\"" { skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); } |"\"merge\"" { Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java?rev=832086&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java Mon Nov 2 20:20:05 2009 @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.impl.util; + +import java.util.ArrayList; +import java.util.List; + + +import org.apache.pig.EvalFunc; +import org.apache.pig.FuncSpec; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.builtin.IsEmpty; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.NonSpillableDataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; + +/* + * A class to add util functions that gets used by LogToPhyTranslator and MRCompiler + * + */ +public class CompilerUtils { + + public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema) throws PlanException { + // we currently have POProject[bag] as the only operator in the plan + // If the bag is an empty bag, we should replace + // it with a bag with one tuple with null fields so that when we flatten + // we do not drop records (flatten will drop records if the bag is left + // as an empty bag) and actually project nulls for the fields in + // the empty bag + + // So we need to get to the following state: + // POProject[Bag] + // \ + // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields) + // \ | POProject[Bag] + // \ | / + // POBinCond + POProject relationProject = (POProject) fePlan.getRoots().get(0); + try { + + // condition of the bincond + POProject relationProjectForIsEmpty = relationProject.clone(); + fePlan.add(relationProjectForIsEmpty); + String scope = relationProject.getOperatorKey().scope; + FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName()); + Object f = PigContext.instantiateFuncFromSpec(isEmptySpec); + POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope, NodeIdGenerator.getGenerator(). + getNextNodeId(scope)), -1, null, isEmptySpec, (EvalFunc) f); + isEmpty.setResultType(DataType.BOOLEAN); + fePlan.add(isEmpty); + fePlan.connect(relationProjectForIsEmpty, isEmpty); + + // lhs of bincond (const bag with null fields) + ConstantExpression ce = new ConstantExpression(new OperatorKey(scope, + NodeIdGenerator.getGenerator().getNextNodeId(scope))); + // the following should give a tuple with the + // required number of nulls + Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size()); + for(int i = 0; i < inputSchema.size(); i++) { + t.set(i, null); + } + List<Tuple> bagContents = new ArrayList<Tuple>(1); + bagContents.add(t); + DataBag bg = new NonSpillableDataBag(bagContents); + ce.setValue(bg); + ce.setResultType(DataType.BAG); + //this operator doesn't have any predecessors + fePlan.add(ce); + + //rhs of bincond is the original project + // let's set up the bincond now + POBinCond bincond = new POBinCond(new OperatorKey(scope, + NodeIdGenerator.getGenerator().getNextNodeId(scope))); + bincond.setCond(isEmpty); + bincond.setLhs(ce); + bincond.setRhs(relationProject); + bincond.setResultType(DataType.BAG); + fePlan.add(bincond); + + fePlan.connect(isEmpty, bincond); + fePlan.connect(ce, bincond); + fePlan.connect(relationProject, bincond); + + } catch (Exception e) { + throw new PlanException("Error setting up outerjoin", e); + } + + } + +} Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=832086&r1=832085&r2=832086&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java Mon Nov 2 20:20:05 2009 @@ -456,7 +456,7 @@ lpt.buildPlan("a = load 'a.txt' as (n:chararray, a:int); "); lpt.buildPlan("b = load 'b.txt' as (n:chararray, m:chararray); "); String[] types = new String[] { "left", "right", "full" }; - String[] joinTypes = new String[] { "replicated", "repl", "skewed", "merge" }; + String[] joinTypes = new String[] { "replicated", "repl", "merge" }; for (int i = 0; i < types.length; i++) { for(int j = 0; j < joinTypes.length; j++) { boolean errCaught = false; Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=832086&r1=832085&r2=832086&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Mon Nov 2 20:20:05 2009 @@ -324,6 +324,43 @@ return; } + public void testSkewedJoinOuter() throws IOException { + pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as (id,name);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);"); + try { + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("C = join A by id left, B by id using \"skewed\";"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("C = join A by id right, B by id using \"skewed\";"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("C = join A by id full, B by id using \"skewed\";"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + } catch(Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + fail("Should support outer join in skewed join"); + } + return; + } + // pig 1048 public void testSkewedJoinOneValue() throws IOException { pigServer.registerQuery("A = LOAD '" + INPUT_FILE3 + "' as (id,name);");