Author: gates Date: Thu Oct 22 19:32:22 2009 New Revision: 828825 URL: http://svn.apache.org/viewvc?rev=828825&view=rev Log: PIG-984: Add map side grouping for data that is already collected when it is read into the map.
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=828825&r1=828824&r2=828825&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Oct 22 19:32:22 2009 @@ -26,6 +26,9 @@ IMPROVEMENTS +PIG-984: Add map side grouping for data that is already collected when +it is read into the map (rding via gates). + PIG-1025: Add ability to set job priority from Pig Latin script (kevinweil via gates) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=828825&r1=828824&r2=828825&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Oct 22 19:32:22 2009 @@ -79,6 +79,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; import org.apache.pig.impl.plan.CompilationMessageCollector; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.NodeIdGenerator; @@ -900,7 +901,22 @@ throw new MRCompilerException(msg, errCode, PigException.BUG, e); } } - + + public void visitCollectedGroup(POCollectedGroup op) throws VisitorException { + try{ + nonBlocking(op); + List<PhysicalPlan> plans = op.getPlans(); + if(plans!=null) + for(PhysicalPlan ep : plans) + addUDFs(ep); + phyToMROpMap.put(op, curMROp); + }catch(Exception e){ + int errCode = 2034; + String msg = "Error compiling operator " + op.getClass().getSimpleName(); + throw new MRCompilerException(msg, errCode, PigException.BUG, e); + } + } + @Override public void visitPOForEach(POForEach op) throws VisitorException{ try{ Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=828825&r1=828824&r2=828825&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Oct 22 19:32:22 2009 @@ -63,6 +63,12 @@ } @Override + public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{ + super.visitCollectedGroup(mg); + mg.setParentPlan(parent); + } + + @Override public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException{ gr.setParentPlan(parent); } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=828825&r1=828824&r2=828825&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Thu Oct 22 19:32:22 2009 @@ -23,6 +23,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -80,6 +81,13 @@ // merge join present endOfAllInputFlag = true; } + + @Override + public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException { + // map side group present + endOfAllInputFlag = true; + } + /** * @return if end of all input is present */ @@ -87,4 +95,5 @@ return endOfAllInputFlag; } } -} \ No newline at end of file +} + Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=828825&r1=828824&r2=828825&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Oct 22 19:32:22 2009 @@ -645,13 +645,23 @@ @Override public void visit(LOCogroup cg) throws VisitorException { - boolean currentPhysicalPlan = false; + + if (cg.getGroupType() == LOCogroup.GROUPTYPE.COLLECTED) { + + translateCollectedCogroup(cg); + + } else { + + translateRegularCogroup(cg); + } + } + + private void translateRegularCogroup(LOCogroup cg) throws VisitorException { String scope = cg.getOperatorKey().scope; List<LogicalOperator> inputs = cg.getInputs(); - + POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey( - scope, nodeGen.getNextNodeId(scope)), cg - .getRequestedParallelism()); + scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism()); POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen .getNextNodeId(scope)), cg.getRequestedParallelism()); @@ -669,8 +679,7 @@ int count = 0; Byte type = null; for (LogicalOperator op : inputs) { - List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans() - .get(op); + List<LogicalPlan> plans = (List<LogicalPlan>)cg.getGroupByPlans().get(op); POLocalRearrange physOp = new POLocalRearrange(new OperatorKey( scope, nodeGen.getNextNodeId(scope)), cg .getRequestedParallelism()); @@ -682,9 +691,8 @@ .spawnChildWalker(lp); pushWalker(childWalker); mCurrentWalker.walk(this); - exprPlans.add((PhysicalPlan) currentPlan); + exprPlans.add(currentPlan); popWalker(); - } currentPlan = currentPlans.pop(); try { @@ -697,8 +705,8 @@ try { physOp.setIndex(count++); } catch (ExecException e1) { - int errCode = 2058; - String msg = "Unable to set index on newly create POLocalRearrange."; + int errCode = 2058; + String msg = "Unable to set index on newly create POLocalRearrange."; throw new VisitorException(msg, errCode, PigException.BUG, e1); } if (plans.size() > 1) { @@ -720,8 +728,8 @@ String msg = "Invalid physical operators in the physical plan" ; throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } - } + poPackage.setKeyType(type); poPackage.setResultType(DataType.TUPLE); poPackage.setNumInps(count); @@ -729,6 +737,59 @@ logToPhyMap.put(cg, poPackage); } + private void translateCollectedCogroup(LOCogroup cg) throws VisitorException { + String scope = cg.getOperatorKey().scope; + List<LogicalOperator> inputs = cg.getInputs(); + + // can have only one input + LogicalOperator op = inputs.get(0); + List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(op); + POCollectedGroup physOp = new POCollectedGroup(new OperatorKey( + scope, nodeGen.getNextNodeId(scope))); + + List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>(); + currentPlans.push(currentPlan); + for (LogicalPlan lp : plans) { + currentPlan = new PhysicalPlan(); + PlanWalker<LogicalOperator, LogicalPlan> childWalker = + mCurrentWalker.spawnChildWalker(lp); + pushWalker(childWalker); + mCurrentWalker.walk(this); + exprPlans.add(currentPlan); + popWalker(); + } + currentPlan = currentPlans.pop(); + + try { + physOp.setPlans(exprPlans); + } catch (PlanException pe) { + int errCode = 2071; + String msg = "Problem with setting up map group's plans."; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe); + } + Byte type = null; + if (plans.size() > 1) { + type = DataType.TUPLE; + physOp.setKeyType(type); + } else { + type = exprPlans.get(0).getLeaves().get(0).getResultType(); + physOp.setKeyType(type); + } + physOp.setResultType(DataType.TUPLE); + + currentPlan.add(physOp); + + try { + currentPlan.connect(logToPhyMap.get(op), physOp); + } catch (PlanException e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan" ; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + + logToPhyMap.put(cg, physOp); + } + @Override protected void visit(LOJoin loj) throws VisitorException { Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=828825&r1=828824&r2=828825&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Oct 22 19:32:22 2009 @@ -59,6 +59,15 @@ visit(); popWalker(); } + + public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{ + List<PhysicalPlan> inpPlans = mg.getPlans(); + for (PhysicalPlan plan : inpPlans) { + pushWalker(mCurrentWalker.spawnChildWalker(plan)); + visit(); + popWalker(); + } + } public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{ List<PhysicalPlan> inpPlans = lr.getPlans(); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=828825&r1=828824&r2=828825&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Thu Oct 22 19:32:22 2009 @@ -156,6 +156,9 @@ else if(node instanceof POLocalRearrange){ sb.append(planString(((POLocalRearrange)node).getPlans())); } + else if(node instanceof POCollectedGroup){ + sb.append(planString(((POCollectedGroup)node).getPlans())); + } else if(node instanceof POSort){ sb.append(planString(((POSort)node).getSortPlans())); } Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=828825&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Thu Oct 22 19:32:22 2009 @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.VisitorException; + +/** + * The collected group operator is a special operator used when users give + * the hint 'using "collected"' in a group by clause. It implements a map-side + * group that collects all records for a given key into a buffer. When it sees + * a key change it will emit the key and bag for records it had buffered. + * It will assume that all keys for a given record are collected together + * and thus there is not need to buffer across keys. + * + */ +public class POCollectedGroup extends PhysicalOperator { + + private static final List<PhysicalPlan> EMPTY_PLAN_LIST = new ArrayList<PhysicalPlan>(); + + protected static final long serialVersionUID = 1L; + + protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); + +// private Log log = LogFactory.getLog(getClass()); + + protected List<PhysicalPlan> plans; + + protected List<ExpressionOperator> leafOps; + + protected byte keyType; + + private Tuple output; + + private DataBag outputBag = null; + + private Object prevKey = null; + + public POCollectedGroup(OperatorKey k) { + this(k, -1, null); + } + + public POCollectedGroup(OperatorKey k, int rp) { + this(k, rp, null); + } + + public POCollectedGroup(OperatorKey k, List<PhysicalOperator> inp) { + this(k, -1, inp); + } + + public POCollectedGroup(OperatorKey k, int rp, List<PhysicalOperator> inp) { + super(k, rp, inp); + leafOps = new ArrayList<ExpressionOperator>(); + output = mTupleFactory.newTuple(2); + } + + @Override + public void visit(PhyPlanVisitor v) throws VisitorException { + v.visitCollectedGroup(this); + } + + @Override + public String name() { + return "Map side group " + "[" + DataType.findTypeName(resultType) + + "]" + "{" + DataType.findTypeName(keyType) + "}" + " - " + mKey.toString(); + } + + @Override + public boolean supportsMultipleInputs() { + return false; + } + + @Override + public boolean supportsMultipleOutputs() { + return false; + } + + /** + * Overridden since the attachment of the new input should cause the old + * processing to end. + */ + @Override + public void attachInput(Tuple t) { + super.attachInput(t); + } + + @SuppressWarnings("unchecked") + @Override + public Result getNext(Tuple t) throws ExecException { + + // Since the output is buffered, we need to flush the last + // set of records when the close method is called by mapper. + if (this.parentPlan.endOfAllInput) { + if (outputBag != null) { + Tuple tup = mTupleFactory.newTuple(2); + tup.set(0, prevKey); + tup.set(1, outputBag); + outputBag = null; + return new Result(POStatus.STATUS_OK, tup); + } + + return new Result(POStatus.STATUS_EOP, null); + } + + Result inp = null; + Result res = null; + + while (true) { + inp = processInput(); + if (inp.returnStatus == POStatus.STATUS_EOP || + inp.returnStatus == POStatus.STATUS_ERR) { + break; + } + + if (inp.returnStatus == POStatus.STATUS_NULL) { + continue; + } + + for (PhysicalPlan ep : plans) { + ep.attachInput((Tuple)inp.result); + } + + List<Result> resLst = new ArrayList<Result>(); + for (ExpressionOperator op : leafOps) { + + switch (op.getResultType()){ + case DataType.BAG: + res = op.getNext(dummyBag); + break; + case DataType.BOOLEAN: + res = op.getNext(dummyBool); + break; + case DataType.BYTEARRAY: + res = op.getNext(dummyDBA); + break; + case DataType.CHARARRAY: + res = op.getNext(dummyString); + break; + case DataType.DOUBLE: + res = op.getNext(dummyDouble); + break; + case DataType.FLOAT: + res = op.getNext(dummyFloat); + break; + case DataType.INTEGER: + res = op.getNext(dummyInt); + break; + case DataType.LONG: + res = op.getNext(dummyLong); + break; + case DataType.MAP: + res = op.getNext(dummyMap); + break; + case DataType.TUPLE: + res = op.getNext(dummyTuple); + break; + } + if (res.returnStatus != POStatus.STATUS_OK) { + return new Result(); + } + resLst.add(res); + } + + Tuple tup = constructOutput(resLst,(Tuple)inp.result); + Object curKey = tup.get(0); + + // the first time, just create a new buffer and continue. + if (prevKey == null && outputBag == null) { + prevKey = curKey; + outputBag = BagFactory.getInstance().newDefaultBag(); + outputBag.add((Tuple)tup.get(1)); + continue; + } + + // no key change + if (prevKey == null && curKey == null) { + outputBag.add((Tuple)tup.get(1)); + continue; + } + + // no key change + if (prevKey != null && curKey != null && ((Comparable)curKey).compareTo(prevKey) == 0) { + outputBag.add((Tuple)tup.get(1)); + continue; + } + + // key change + Tuple tup2 = mTupleFactory.newTuple(2); + tup2.set(0, prevKey); + tup2.set(1, outputBag); + res.result = tup2; + + prevKey = curKey; + outputBag = BagFactory.getInstance().newDefaultBag(); + outputBag.add((Tuple)tup.get(1)); + + return res; + } + + return inp; + } + + protected Tuple constructOutput(List<Result> resLst, Tuple value) throws ExecException{ + + // Construct key + Object key; + + if (resLst.size() > 1) { + Tuple t = mTupleFactory.newTuple(resLst.size()); + int i = -1; + for (Result res : resLst) { + t.set(++i, res.result); + } + key = t; + } + else { + key = resLst.get(0).result; + } + + // Put key and value in a tuple and return + output.set(0, key); + output.set(1, value); + + return output; + } + + public byte getKeyType() { + return keyType; + } + + public void setKeyType(byte keyType) { + this.keyType = keyType; + } + + public List<PhysicalPlan> getPlans() { + return (plans == null) ? EMPTY_PLAN_LIST : plans; + } + + public void setPlans(List<PhysicalPlan> plans) throws PlanException { + this.plans = plans; + leafOps.clear(); + for (PhysicalPlan plan : plans) { + ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); + leafOps.add(leaf); + } + } +} Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=828825&r1=828824&r2=828825&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Thu Oct 22 19:32:22 2009 @@ -18,6 +18,7 @@ package org.apache.pig.impl.logicalLayer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; @@ -43,6 +44,14 @@ public class LOCogroup extends RelationalOperator { private static final long serialVersionUID = 2L; + + /** + * Enum for the type of group + */ + public static enum GROUPTYPE { + REGULAR, // Regular (co)group + COLLECTED // Collected group + }; /** * Cogroup contains a list of logical operators corresponding to the @@ -53,6 +62,7 @@ private boolean[] mIsInner; private static Log log = LogFactory.getLog(LOCogroup.class); private MultiMap<LogicalOperator, LogicalPlan> mGroupByPlans; + private GROUPTYPE mGroupType; /** * @@ -70,9 +80,34 @@ OperatorKey k, MultiMap<LogicalOperator, LogicalPlan> groupByPlans, boolean[] isInner) { + this(plan, k, groupByPlans, GROUPTYPE.REGULAR, isInner); + } + + /** + * + * @param plan + * LogicalPlan this operator is a part of. + * @param k + * OperatorKey for this operator + * @param groupByPlans + * the group by columns + * @param type + * the type of this group + * @param isInner + * indicates whether the cogroup is inner for each relation + */ + public LOCogroup( + LogicalPlan plan, + OperatorKey k, + MultiMap<LogicalOperator, LogicalPlan> groupByPlans, + GROUPTYPE type, + boolean[] isInner) { super(plan, k); mGroupByPlans = groupByPlans; - mIsInner = isInner; + if (isInner != null) { + mIsInner = Arrays.copyOf(isInner, isInner.length); + } + mGroupType = type; } public List<LogicalOperator> getInputs() { @@ -95,6 +130,10 @@ mIsInner = inner; } + public GROUPTYPE getGroupType() { + return mGroupType; + } + @Override public String name() { return "CoGroup " + mKey.scope + "-" + mKey.id; Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=828825&r1=828824&r2=828825&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Oct 22 19:32:22 2009 @@ -249,7 +249,7 @@ return fname; } - LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{ + LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp, LOCogroup.GROUPTYPE type) throws ParseException, PlanException{ log.trace("Entering parseCogroup"); log.debug("LogicalPlan: " + lp); @@ -286,7 +286,7 @@ isInner[i] = gi.isInner; } - LogicalOperator cogroup = new LOCogroup(lp, new OperatorKey(scope, getNextId()), groupByPlans, isInner); + LogicalOperator cogroup = new LOCogroup(lp, new OperatorKey(scope, getNextId()), groupByPlans, type, isInner); lp.add(cogroup); log.debug("Added operator " + cogroup.getClass().getName() + " object " + cogroup + " to the logical plan " + lp); @@ -388,7 +388,7 @@ for (int i = 0; i < n; i++) { (gis.get(i)).isInner = true; } - LogicalOperator cogroup = parseCogroup(gis, lp); + LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR); lp.add(cogroup); log.debug("Added operator " + cogroup.getClass().getName() + " to the logical plan"); @@ -676,7 +676,21 @@ } log.trace("Exiting attachPlan"); } - + + boolean isColumnProjectionsOrStar(CogroupInput cgi) { + if (cgi == null || cgi.plans == null || cgi.plans.size() == 0) { + return false; + } + for (LogicalPlan keyPlan: cgi.plans) { + for (LogicalOperator op : keyPlan) { + if(!(op instanceof LOProject)) { + return false; + } + } + } + return true; + } + } @@ -1623,20 +1637,40 @@ LogicalOperator CogroupClause(LogicalPlan lp) : { - CogroupInput gi; - ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); - LogicalOperator cogroup; - log.trace("Entering CoGroupClause"); + CogroupInput gi; + ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); + LogicalOperator cogroup = null; + log.trace("Entering CoGroupClause"); } { - (gi = GroupItem(lp) { gis.add(gi); } - ("," gi = GroupItem(lp) { gis.add(gi); })*) - { - cogroup = parseCogroup(gis, lp); - log.trace("Exiting CoGroupClause"); - return cogroup; - } + (gi = GroupItem(lp) { gis.add(gi); } + ("," gi = GroupItem(lp) { gis.add(gi); })* + ( + [<USING> ("\"collected\"" { + if (gis.size() != 1) { + throw new ParseException("Collected group is only supported for single input"); + } + if (!isColumnProjectionsOrStar(gis.get(0))) { + throw new ParseException("Collected group is only supported for columns or star projection"); + } + cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.COLLECTED); + } + ) + ] + ) + ) + + { + if (cogroup != null) { + log.trace("Exiting CoGroupClause"); + return cogroup; + } + + cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR); + log.trace("Exiting CoGroupClause"); + return cogroup; + } } Added: hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java?rev=828825&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Thu Oct 22 19:32:22 2009 @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.*; +import java.util.Iterator; +import java.util.List; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.pig.EvalFunc; +import org.apache.pig.ExecType; +import org.apache.pig.FuncSpec; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.test.utils.TestHelper; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.impl.plan.OperatorKey; +import org.junit.After; +import org.junit.Before; + +public class TestCollectedGroup extends TestCase { + private static final String INPUT_FILE = "MapSideGroupInput.txt"; + + private PigServer pigServer; + private MiniCluster cluster = MiniCluster.buildCluster(); + + public TestCollectedGroup() throws ExecException, IOException{ + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + } + + @Before + public void setUp() throws Exception { + createFiles(); + } + + private void createFiles() throws IOException { + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("100\tapple1\t95"); + w.println("100\tapple2\t83"); + w.println("100\tapple2\t74"); + w.println("200\torange1\t100"); + w.println("200\torange2\t89"); + w.println("300\tstrawberry\t64"); + w.println("300\tstrawberry\t64"); + w.println("300\tstrawberry\t76"); + w.println("400\tpear\t78"); + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + } + + @After + public void tearDown() throws Exception { + new File(INPUT_FILE).delete(); + Util.deleteFile(cluster, INPUT_FILE); + } + + public void testPOMapsideGroupNoNullPlans() throws IOException { + POCollectedGroup pmg = new POCollectedGroup(new OperatorKey()); + List<PhysicalPlan> plans = pmg.getPlans(); + + Assert.assertTrue(plans != null); + Assert.assertTrue(plans.size() == 0); + } + + public void testMapsideGroupParserNoSupportForMultipleInputs() throws IOException { + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (id, name, grade);"); + + try { + pigServer.registerQuery("C = group A by id, B by id using \"collected\";"); + fail("Pig doesn't support multi-input collected group."); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), + "Error during parsing. Collected group is only supported for single input"); + } + } + + public void testMapsideGroupParserNoSupportForGroupAll() throws IOException { + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);"); + + try { + pigServer.registerQuery("B = group A all using \"collected\";"); + fail("Pig doesn't support collected group all."); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), + "Error during parsing. Collected group is only supported for columns or star projection"); + } + } + + public void testMapsideGroupParserNoSupportForByExpression() throws IOException { + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);"); + + try { + pigServer.registerQuery("B = group A by id*grade using \"collected\";"); + fail("Pig doesn't support collected group by expression."); + } catch (Exception e) { + Assert.assertEquals(e.getMessage(), + "Error during parsing. Collected group is only supported for columns or star projection"); + } + } + + public void testMapsideGroupByOneColumn() throws IOException{ + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);"); + + try { + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); + DataBag dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("B = group A by id using \"collected\";"); + pigServer.registerQuery("C = foreach B generate group, COUNT(A);"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while (iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("D = group A by id;"); + pigServer.registerQuery("E = foreach D generate group, COUNT(A);"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + + while (iter.hasNext()) { + dbshj.add(iter.next()); + } + } + Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0); + Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + + } catch (Exception e) { + fail(e.getMessage()); + } + } + + public void testMapsideGroupByMultipleColumns() throws IOException{ + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);"); + + try { + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); + DataBag dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("B = group A by (id, name) using \"collected\";"); + pigServer.registerQuery("C = foreach B generate group, COUNT(A);"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while (iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("D = group A by (id, name);"); + pigServer.registerQuery("E = foreach D generate group, COUNT(A);"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + + while (iter.hasNext()) { + dbshj.add(iter.next()); + } + } + Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0); + Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + + } catch (Exception e) { + fail(e.getMessage()); + } + } + + public void testMapsideGroupByStar() throws IOException{ + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);"); + + try { + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); + DataBag dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("B = group A by * using \"collected\";"); + pigServer.registerQuery("C = foreach B generate group, COUNT(A);"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while (iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("D = group A by *;"); + pigServer.registerQuery("E = foreach D generate group, COUNT(A);"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + + while (iter.hasNext()) { + dbshj.add(iter.next()); + } + } + Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0); + Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); + + } catch (Exception e) { + fail(e.getMessage()); + } + } + +}