Author: olga Date: Thu Oct 9 12:23:51 2008 New Revision: 703233 URL: http://svn.apache.org/viewvc?rev=703233&view=rev Log: PIG-465: perf improvement - removing keys from the value
Modified: incubator/pig/branches/types/CHANGES.txt incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java Modified: incubator/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/CHANGES.txt (original) +++ incubator/pig/branches/types/CHANGES.txt Thu Oct 9 12:23:51 2008 @@ -275,3 +275,6 @@ PIG-457: report 100% on successful jobs only (shravanmn via olgan) PIG-471: ignoring status errors from hadoop (pradeepk via olgan) + + PIG-465: performance improvement - removing keys from the value (pradeepk + via olgan) Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Thu Oct 9 12:23:51 2008 @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.pig.data.DataType; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; @@ -393,7 +394,7 @@ func.setAlgebraicFunction(type); } - private void fixUpRearrange(POLocalRearrange rearrange) { + private void fixUpRearrange(POLocalRearrange rearrange) throws ExecException { // Set the projection to be the key PhysicalPlan newPlan = new PhysicalPlan(); String scope = rearrange.getOperatorKey().scope; @@ -404,8 +405,7 @@ newPlan.add(proj); List<PhysicalPlan> plans = new ArrayList<PhysicalPlan>(1); plans.add(newPlan); - rearrange.setPlans(plans); - rearrange.setIndex(mKeyField); + rearrange.setPlansFromCombiner(plans); } private class AlgebraicPlanChecker extends PhyPlanVisitor { Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Oct 9 12:23:51 2008 @@ -381,7 +381,7 @@ Class comparator = PigContext.resolveClassName(compFuncSpec); if(ComparisonFunc.class.isAssignableFrom(comparator)) { jobConf.setMapperClass(PigMapReduce.MapWithComparator.class); - pack.setKeyType(DataType.TUPLE); + jobConf.setReducerClass(PigMapReduce.ReduceWithComparator.class); jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack)); jobConf.set("pig.usercomparator", "true"); jobConf.setOutputKeyClass(NullableTuple.class); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java Thu Oct 9 12:23:51 2008 @@ -21,6 +21,7 @@ import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; @@ -118,7 +119,11 @@ CombinerOptimizer co = new CombinerOptimizer(plan); co.visit(); } - + + // optimize key - value handling in package + POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan); + pkgAnnotator.visit(); + // check whether stream operator is present MRStreamHandler checker = new MRStreamHandler(plan); checker.visit(); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Oct 9 12:23:51 2008 @@ -36,6 +36,7 @@ import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.plan.PlanException; @@ -136,6 +137,10 @@ co.visit(); } + // optimize key - value handling in package + POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan); + pkgAnnotator.visit(); + // check whether stream operator is present MRStreamHandler checker = new MRStreamHandler(plan); checker.visit(); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Thu Oct 9 12:23:51 2008 @@ -48,6 +48,7 @@ import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.SpillableMemoryManager; +import org.apache.pig.impl.util.WrappedIOException; /** * This class is the static Mapper & Reducer classes that @@ -95,14 +96,26 @@ } } + /** + * This "specialized" map class is ONLY to be used in pig queries with + * order by a udf. A UDF used for comparison in the order by expects + * to be handed tuples. Hence this map class ensures that the "key" used + * in the order by is wrapped into a tuple (if it isn't already a tuple) + */ public static class MapWithComparator extends PigMapBase implements Mapper<Text, TargetedTuple, PigNullableWritable, Writable> { @Override public void collect(OutputCollector<PigNullableWritable, Writable> oc, Tuple tuple) throws ExecException, IOException { - Object k = tuple.get(1); - Tuple keyTuple = tf.newTuple(k); + Object keyTuple = null; + if(keyType != DataType.TUPLE) { + Object k = tuple.get(1); + keyTuple = tf.newTuple(k); + } else { + keyTuple = tuple.get(1); + } + Byte index = (Byte)tuple.get(0); PigNullableWritable key = @@ -121,23 +134,23 @@ public static class Reduce extends MapReduceBase implements Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> { - private final Log log = LogFactory.getLog(getClass()); + protected final Log log = LogFactory.getLog(getClass()); //The reduce plan - private PhysicalPlan rp; + protected PhysicalPlan rp; //The POPackage operator which is the //root of every Map Reduce plan is //obtained through the job conf. The portion //remaining after its removal is the reduce //plan - private POPackage pack; + protected POPackage pack; ProgressableReporter pigReporter; - private OutputCollector<PigNullableWritable, Writable> outputCollector; + protected OutputCollector<PigNullableWritable, Writable> outputCollector; - private boolean errorInReduce = false; + protected boolean errorInReduce = false; /** * Configures the Reduce plan, the POPackage operator @@ -231,7 +244,7 @@ * @throws ExecException * @throws IOException */ - private void runPipeline(PhysicalOperator leaf) throws ExecException, IOException { + protected void runPipeline(PhysicalOperator leaf) throws ExecException, IOException { while(true) { Tuple dummyTuple = null; @@ -307,4 +320,98 @@ } } + /** + * This "specialized" reduce class is ONLY to be used in pig queries with + * order by a udf. A UDF used for comparison in the order by expects + * to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator) + * ensures that the "key" used in the order by is wrapped into a tuple (if it + * isn't already a tuple). This reduce class unwraps this tuple in the case where + * the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage + * for processing + */ + public static class ReduceWithComparator extends PigMapReduce.Reduce { + + private byte keyType; + + /** + * Configures the Reduce plan, the POPackage operator + * and the reporter thread + */ + @Override + public void configure(JobConf jConf) { + super.configure(jConf); + keyType = pack.getKeyType(); + } + + /** + * The reduce function which packages the key and List<Tuple> + * into key, Bag<Tuple> after converting Hadoop type key into Pig type. + * The package result is either collected as is, if the reduce plan is + * empty or after passing through the reduce plan. + */ + public void reduce(PigNullableWritable key, + Iterator<NullableTuple> tupIter, + OutputCollector<PigNullableWritable, Writable> oc, + Reporter reporter) throws IOException { + + // cache the collector for use in runPipeline() + // which could additionally be called from close() + this.outputCollector = oc; + pigReporter.setRep(reporter); + + // If the keyType is not a tuple, the MapWithComparator.collect() + // would have wrapped the key into a tuple so that the + // comparison UDF used in the order by can process it. + // We need to unwrap the key out of the tuple and hand it + // to the POPackage for processing + if(keyType != DataType.TUPLE) { + Tuple t = (Tuple)(key.getValueAsPigType()); + try { + key = HDataType.getWritableComparableTypes(t.get(0), keyType); + } catch (ExecException e) { + throw WrappedIOException.wrap(e); + } + } + + pack.attachInput(key, tupIter); + + try { + Tuple t=null; + Result res = pack.getNext(t); + if(res.returnStatus==POStatus.STATUS_OK){ + Tuple packRes = (Tuple)res.result; + + if(rp.isEmpty()){ + oc.collect(null, packRes); + return; + } + + rp.attachInput(packRes); + + List<PhysicalOperator> leaves = rp.getLeaves(); + + PhysicalOperator leaf = leaves.get(0); + runPipeline(leaf); + + } + + if(res.returnStatus==POStatus.STATUS_NULL) { + return; + } + + if(res.returnStatus==POStatus.STATUS_ERR){ + IOException ioe = new IOException("Packaging error while processing group"); + throw ioe; + } + + + } catch (ExecException e) { + IOException ioe = new IOException(e.getMessage()); + ioe.initCause(e.getCause()); + throw ioe; + } + } + + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Oct 9 12:23:51 2008 @@ -76,6 +76,10 @@ //do nothing } + public void visitCombinerPackage(POPostCombinerPackage pkg) throws VisitorException{ + //do nothing + } + public void visitPOForEach(POForEach nfe) throws VisitorException { List<PhysicalPlan> inpPlans = nfe.getInputPlans(); for (PhysicalPlan plan : inpPlans) { @@ -231,5 +235,19 @@ } + /** + * @param localRearrangeForIllustrate + * @throws VisitorException + */ + public void visitLocalRearrangeForIllustrate( + POLocalRearrangeForIllustrate lrfi) throws VisitorException { + List<PhysicalPlan> inpPlans = lrfi.getPlans(); + for (PhysicalPlan plan : inpPlans) { + pushWalker(mCurrentWalker.spawnChildWalker(plan)); + visit(); + } + + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Oct 9 12:23:51 2008 @@ -18,7 +18,9 @@ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +32,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.plan.OperatorKey; @@ -47,31 +50,59 @@ /** * */ - private static final long serialVersionUID = 1L; + protected static final long serialVersionUID = 1L; - private static TupleFactory mTupleFactory = TupleFactory.getInstance(); + protected static TupleFactory mTupleFactory = TupleFactory.getInstance(); private Log log = LogFactory.getLog(getClass()); - List<PhysicalPlan> plans; + protected List<PhysicalPlan> plans; - List<ExpressionOperator> leafOps; + protected List<ExpressionOperator> leafOps; // The position of this LR in the package operator - byte index; + protected byte index; - byte keyType; + protected byte keyType; - private boolean mIsDistinct = false; + protected boolean mIsDistinct = false; - private boolean isCross = false; + protected boolean isCross = false; + + // map to store mapping of projected columns to + // the position in the "Key" where these will be projected to. + // We use this information to strip off these columns + // from the "Value" and in POPackage stitch the right "Value" + // tuple back by getting these columns from the "key". The goal + // is to reduce the amount of the data sent to Hadoop in the map. + // Example: a = load 'bla'; b = load 'bla'; c = cogroup a by ($2, $3), b by ($, $2) + // For the first input (a), the map would contain following key:value + // 2:0 (2 corresponds to $2 in cogroup a by ($2, $3) and 0 corresponds to 1st index in key) + // 3:1 (3 corresponds to $3 in cogroup a by ($2, $3) and 0 corresponds to 2nd index in key) + private Map<Integer, Integer> mProjectedColsMap; // A place holder Tuple used in distinct case where we really don't // have any value to pass through. But hadoop gets cranky if we pass a // null, so we'll just create one instance of this empty tuple and // pass it for every row. We only get around to actually creating it if // mIsDistinct is set to true. - private Tuple mFakeTuple = null; + protected Tuple mFakeTuple = null; + + // indicator whether the project in the inner plans + // is a project(*) - we set this ONLY when the project(*) + // is the ONLY thing in the cogroup by .. + private boolean mProjectStar = false; + + // marker to note that the "key" is a tuple + // this is required by POPackage to pick things + // off the "key" correctly to stitch together the + // "value" + private boolean isKeyTuple = false; + + private int mProjectedColsMapSize = 0; + + private ArrayList<Integer> minValuePositions; + private int minValuePositionsSize = 0; public POLocalRearrange(OperatorKey k) { this(k, -1, null); @@ -89,6 +120,7 @@ super(k, rp, inp); index = -1; leafOps = new ArrayList<ExpressionOperator>(); + mProjectedColsMap = new HashMap<Integer, Integer>(); } @Override @@ -206,12 +238,13 @@ resLst.add(res); } res.result = constructLROutput(resLst,(Tuple)inp.result); + return res; } return inp; } - private Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{ + protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{ //Construct key Object key; if(resLst.size()>1){ @@ -234,17 +267,70 @@ output.set(1, key); output.set(2, mFakeTuple); return output; + } else if(isCross){ + + for(int i=0;i<plans.size();i++) + value.getAll().remove(0); + //Put the index, key, and value + //in a tuple and return + output.set(0, new Byte(index)); + output.set(1, key); + output.set(2, value); + return output; } else { - if(isCross){ - for(int i=0;i<plans.size();i++) - value.getAll().remove(0); - } //Put the index, key, and value //in a tuple and return output.set(0, new Byte(index)); output.set(1, key); - output.set(2, value); + + // strip off the columns in the "value" which + // are present in the "key" + if(mProjectedColsMapSize != 0 || mProjectStar == true) { + + Tuple minimalValue = null; + if(!mProjectStar) { + if(minValuePositions == null) { + // the very first time, we will have to build + // the "value" tuple piecemeal but we can + // do better next time round + minValuePositions = new ArrayList<Integer>(); + minimalValue = mTupleFactory.newTuple(); + // look for individual columns that we are + // projecting + for (int i = 0; i < value.size(); i++) { + if(mProjectedColsMap.get(i) == null) { + // this column was not found in the "key" + // so send it in the "value" + minimalValue.append(value.get(i)); + minValuePositions.add(i); + } + } + minValuePositionsSize = minValuePositions.size(); + } else { + minimalValue = mTupleFactory.newTuple(minValuePositionsSize); + for(int i = 0; i < minValuePositionsSize; i++) { + minimalValue.set(i, value.get(minValuePositions.get(i))); + } + } + } else { + // for the project star case + // we would send out an empty tuple as + // the "value" since all elements are in the + // "key" + minimalValue = mTupleFactory.newTuple(); + + } + output.set(2, minimalValue); + + } else { + + // there were no columns in the "key" + // which we can strip off from the "value" + // so just send the value we got + output.set(2, value); + + } return output; } } @@ -264,9 +350,53 @@ public void setPlans(List<PhysicalPlan> plans) { this.plans = plans; leafOps.clear(); + int keyIndex = 0; // zero based index for fields in the key for (PhysicalPlan plan : plans) { - leafOps.add((ExpressionOperator)plan.getLeaves().get(0)); + ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); + leafOps.add(leaf); + + // don't optimize CROSS + if(!isCross) { + // Look for the leaf Ops which are POProject operators - get the + // the columns that these POProject Operators are projecting. + // They MUST be projecting either a column or '*'. + // Keep track of the columns which are being projected and + // the position in the "Key" where these will be projected to. + // Then we can use this information to strip off these columns + // from the "Value" and in POPackage stitch the right "Value" + // tuple back by getting these columns from the "key". The goal + // is reduce the amount of the data sent to Hadoop in the map. + if(leaf instanceof POProject) { + POProject project = (POProject) leaf; + if(project.isStar()) { + if(plans.size() == 1) { + // note that we have a project * + mProjectStar = true; + // key will be a tuple in this case + isKeyTuple = true; + } else { + // TODO: currently "group by (*, somethingelse)" is NOT + // allowed. So we should never get here. But once it is + // allowed, we will need to handle it. For now just log + log.debug("Project * in group by not being optimized in key-value transfer"); + } + } else { + mProjectedColsMap.put(project.getColumn(), keyIndex); + } + if(project.getResultType() == DataType.TUPLE) + isKeyTuple = true; + } + keyIndex++; + } + } + if(keyIndex > 1) { + // make a note that the "key" is a tuple + // this is required by POPackage to pick things + // off the "key" correctly to stitch together the + // "value" + isKeyTuple = true; } + mProjectedColsMapSize = mProjectedColsMap.size(); } /** @@ -301,5 +431,74 @@ this.isCross = isCross; } + /** + * @return the mProjectedColsMap + */ + public Map<Integer, Integer> getProjectedColsMap() { + return mProjectedColsMap; + } + + /** + * @return the mProjectStar + */ + public boolean isProjectStar() { + return mProjectStar; + } + + /** + * @return the keyTuple + */ + public boolean isKeyTuple() { + return isKeyTuple; + } + + /** + * @param plans2 + * @throws ExecException + */ + public void setPlansFromCombiner(List<PhysicalPlan> plans) throws ExecException { + this.plans = plans; + leafOps.clear(); + mProjectedColsMap.clear(); + int keyIndex = 0; // zero based index for fields in the key + for (PhysicalPlan plan : plans) { + ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); + leafOps.add(leaf); + + // don't optimize CROSS + if(!isCross) { + // Look for the leaf Ops which are POProject operators - get the + // the columns that these POProject Operators are projecting. + // Keep track of the columns which are being projected and + // the position in the "Key" where these will be projected to. + // Then we can use this information to strip off these columns + // from the "Value" and in POPostCombinerPackage stitch the right "Value" + // tuple back by getting these columns from the "key". The goal + // is reduce the amount of the data sent to Hadoop in the map. + if(leaf instanceof POProject) { + POProject project = (POProject) leaf; + if(project.isStar()) { + log.error("Unexpected data during optimization"); + throw new ExecException("Unexpected data during optimization (Local rearrange" + + " in combiner has a project *" ); + } else { + mProjectedColsMap.put(project.getColumn(), keyIndex); + } + if(project.getResultType() == DataType.TUPLE) + isKeyTuple = true; + } + keyIndex++; + } + } + if(keyIndex > 1) { + // make a note that the "key" is a tuple + // this is required by POPackage to pick things + // off the "key" correctly to stitch together the + // "value" + isKeyTuple = true; + } + mProjectedColsMapSize = mProjectedColsMap.size(); + + } } Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Oct 9 12:23:51 2008 @@ -17,8 +17,10 @@ */ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +39,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.Pair; /** * The package operator that packages * the globally rearranged tuples into @@ -64,6 +67,11 @@ //The key being worked on Object key; + // marker to indicate if key is a tuple + protected boolean isKeyTuple = false; + // key as a Tuple object (if the key is a tuple) + protected Tuple keyAsTuple; + //key's type byte keyType; @@ -76,6 +84,14 @@ //on a particular input boolean[] inner; + // A mapping of input index to key information got from LORearrange + // for that index. The Key information is a pair of boolean, Map. + // The boolean indicates whether there is a lone project(*) in the + // cogroup by. If not, the Map has a mapping of column numbers in the + // "value" to column numbers in the "key" which contain the fields in + // the "value" + protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo; + private final Log log = LogFactory.getLog(getClass()); protected static BagFactory mBagFactory = BagFactory.getInstance(); @@ -96,6 +112,7 @@ public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) { super(k, rp, inp); numInputs = -1; + keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); } @Override @@ -127,6 +144,11 @@ public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) { tupIter = inp; key = k.getValueAsPigType(); + if(isKeyTuple) { + // key is a tuple, cache the key as a + // tuple for use in the getNext() + keyAsTuple = (Tuple)key; + } } /** @@ -183,8 +205,64 @@ copy.set(i, val.get(i)); } */ - Tuple copy = mTupleFactory.newTuple(val.getAll()); - if (numInputs > 0) dbs[ntup.getIndex()].add(copy); + + Tuple copy = null; + // The "value (val)" that we just got may not + // be the complete "value". It may have some portions + // in the "key" (look in POLocalRearrange for more comments) + // If this is the case we need to stitch + // the "value" together. + int index = ntup.getIndex(); + Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = + keyInfo.get(index); + boolean isProjectStar = lrKeyInfo.first; + Map<Integer, Integer> keyLookup = lrKeyInfo.second; + int keyLookupSize = keyLookup.size(); + + if( keyLookupSize > 0) { + + // we have some fields of the "value" in the + // "key". + copy = mTupleFactory.newTuple(); + int finalValueSize = keyLookupSize + val.size(); + int valIndex = 0; // an index for accessing elements from + // the value (val) that we have currently + for(int i = 0; i < finalValueSize; i++) { + Integer keyIndex = keyLookup.get(i); + if(keyIndex == null) { + // the field for this index is not in the + // key - so just take it from the "value" + // we were handed + copy.append(val.get(valIndex)); + valIndex++; + } else { + // the field for this index is in the key + if(isKeyTuple) { + // the key is a tuple, extract the + // field out of the tuple + copy.append(keyAsTuple.get(keyIndex)); + } else { + copy.append(key); + } + } + } + + } else if (isProjectStar) { + + log.info("In project star, keyAsTuple:" + keyAsTuple); + // the whole "value" is present in the "key" + copy = mTupleFactory.newTuple(keyAsTuple.getAll()); + + } else { + + // there is no field of the "value" in the + // "key" - so just make a copy of what we got + // as the "value" + copy = mTupleFactory.newTuple(val.getAll()); + + } + + if (numInputs > 0) dbs[index].add(copy); if(reporter!=null) reporter.progress(); } @@ -241,5 +319,26 @@ return clone; } + /** + * @param keyInfo the keyInfo to set + */ + public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) { + this.keyInfo = keyInfo; + } + + /** + * @param keyTuple the keyTuple to set + */ + public void setKeyTuple(boolean keyTuple) { + this.isKeyTuple = keyTuple; + } + + /** + * @return the keyInfo + */ + public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() { + return keyInfo; + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java Thu Oct 9 12:23:51 2008 @@ -19,6 +19,7 @@ import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,6 +37,7 @@ import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.Pair; /** * The package operator that packages the globally rearranged tuples into * output format after the combiner stage. It differs from POPackage in that @@ -98,15 +100,39 @@ while (tupIter.hasNext()) { NullableTuple ntup = tupIter.next(); Tuple tup = (Tuple)ntup.getValueAsPigType(); - for (int i = 0; i < tup.size(); i++) { - if (mBags[i]) ((DataBag)fields[i]).add((Tuple)tup.get(i)); - else fields[i] = tup.get(i); + // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the + // group case and not in cogroups. So there should only + // be one LocalRearrange from which we get the keyInfo for + // which field in the value is in the key. This LocalRearrange + // has an index of -1. When we do support combiner in Cogroups + // THIS WILL NEED TO BE REVISITED. + Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = + keyInfo.get(0); // assumption: only group are "combinable", hence index 0 + Map<Integer, Integer> keyLookup = lrKeyInfo.second; + int tupIndex = 0; // an index for accessing elements from + // the value (tup) that we have currently + for(int i = 0; i < mBags.length; i++) { + Integer keyIndex = keyLookup.get(i); + if(keyIndex == null) { + // the field for this index is not the + // key - so just take it from the "value" + // we were handed - Currently THIS HAS TO BE A BAG + // In future if this changes, THIS WILL NEED TO BE + // REVISITED. + ((DataBag)fields[i]).add((Tuple)tup.get(tupIndex)); + tupIndex++; + } else { + // the field for this index is in the key + fields[i] = key; + } } } - //Construct the output tuple by appending - //the key and all the above constructed bags - //and return it. + // The successor of the POPostCombinerPackage as of + // now SHOULD be a POForeach which has been adjusted + // to look for the key in the right place - so we will + // NOT be adding the key in the result here but mere + // putting all bags into a result tuple and returning it. Tuple res; res = mTupleFactory.newTuple(mBags.length); for (int i = 0; i < mBags.length; i++) res.set(i, fields[i]); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java Thu Oct 9 12:23:51 2008 @@ -10,6 +10,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate; import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup; import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplitOutput; @@ -52,7 +53,7 @@ for(LogicalOperator lo : inputs) { List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(lo); - POLocalRearrange physOp = new POLocalRearrange(new OperatorKey( + POLocalRearrangeForIllustrate physOp = new POLocalRearrangeForIllustrate(new OperatorKey( scope, nodeGen.getNextNodeId(scope)), cg .getRequestedParallelism()); List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>(); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java Thu Oct 9 12:23:51 2008 @@ -34,4 +34,12 @@ first = f; second = s; } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "[" + first.toString() +"," + second.toString() + "]"; + } } Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java Thu Oct 9 12:23:51 2008 @@ -84,15 +84,29 @@ int size=0; for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){ Tuple t = (Tuple)res.result; + String key = (String)t.get(1); Tuple val = (Tuple)t.get(2); + // The input data has 2 columns of which the first + // is the key + // With the optimized LocalRearrange, the part + // of the "value" present in the "key" is + // excluded from the "value". So to reconstruct + // the true "value", create a tuple with "key" in + // first position and the "value" (val) we currently + // have in the second position + assertEquals(1, val.size()); + + Tuple actualVal = new DefaultTuple(); + actualVal.append(key); + actualVal.append(val.get(0)); //Check if the index is same as input index assertEquals((byte)0, (byte)(Byte)t.get(0)); //Check if the input bag contains the value tuple - assertTrue(TestHelper.bagContains(db, val)); + assertTrue(TestHelper.bagContains(db, actualVal)); //Check if the input key and the output key are same - String inpKey = (String)val.get(0); + String inpKey = (String)actualVal.get(0); assertEquals(0, inpKey.compareTo((String)t.get(1))); ++size; } @@ -127,10 +141,23 @@ int size=0; for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){ Tuple t = (Tuple)res.result; + Tuple key = (Tuple)t.get(1); Tuple val = (Tuple)t.get(2); + + // The input data has 2 columns of which both + // are the key. + // With the optimized LocalRearrange, the part + // of the "value" present in the "key" is + // excluded from the "value". So in this case, + // the "value" coming out of the LocalRearrange + // would be an empty tuple + assertEquals(0, val.size()); + //Check if the index is same as input index assertEquals((byte)0, (byte)(Byte)t.get(0)); + // reconstruct value from tuple + val = key; //Check if the input baf contains the value tuple assertTrue(TestHelper.bagContains(db, val)); Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java Thu Oct 9 12:23:51 2008 @@ -30,7 +30,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead; import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup; import org.apache.pig.data.BagFactory; @@ -78,7 +78,7 @@ p1.add(prj1); List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>(); in1.add(p1); - POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r + POLocalRearrangeForIllustrate lr1 = new POLocalRearrangeForIllustrate(new OperatorKey("", r .nextLong()), -1, inputs1); lr1.setPlans(in1); lr1.setIndex(0); @@ -92,7 +92,7 @@ p2.add(prj2); List<PhysicalPlan> in2 = new LinkedList<PhysicalPlan>(); in2.add(p2); - POLocalRearrange lr2 = new POLocalRearrange(new OperatorKey("", r + POLocalRearrangeForIllustrate lr2 = new POLocalRearrangeForIllustrate(new OperatorKey("", r .nextLong()), -1, inputs2); lr2.setPlans(in2); lr2.setIndex(1); @@ -172,7 +172,7 @@ p1.add(prj1); List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>(); in1.add(p1); - POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r + POLocalRearrangeForIllustrate lr1 = new POLocalRearrangeForIllustrate(new OperatorKey("", r .nextLong()), -1, inputs1); lr1.setPlans(in1); lr1.setIndex(0); Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java?rev=703233&r1=703232&r2=703233&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java Thu Oct 9 12:23:51 2008 @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.util.Pair; import org.apache.pig.test.utils.GenRandomData; import org.apache.pig.test.utils.TestHelper; import org.junit.After; @@ -79,6 +81,18 @@ pop.setInner(inner); PigNullableWritable k = HDataType.getWritableComparableTypes(key, (byte)0); pop.attachInput(k, db.iterator()); + + // we are not doing any optimization to remove + // parts of the "value" which are present in the "key" in this + // unit test - so set up the "keyInfo" accordingly in + // the POPackage + Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = + new HashMap<Integer, Pair<Boolean, Map<Integer,Integer>>>(); + Pair<Boolean, Map<Integer, Integer>> p = + new Pair<Boolean, Map<Integer, Integer>>(false, new HashMap<Integer, Integer>()); + keyInfo.put(0, p); + keyInfo.put(1, p); + pop.setKeyInfo(keyInfo); Tuple t = null; Result res = null; res = (Result) pop.getNext(t);