Author: daijy Date: Wed Nov 18 18:58:34 2009 New Revision: 881877 URL: http://svn.apache.org/viewvc?rev=881877&view=rev Log: PIG-1060: MultiQuery optimization throws error for multi-level splits
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=881877&r1=881876&r2=881877&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Wed Nov 18 18:58:34 2009 @@ -196,7 +196,9 @@ PIG-1001: Generate more meaningful error message when one input file does not exist (daijy) -Release 0.5.0 - Unreleased +PIG-1060: MultiQuery optimization throws error for multi-level splits (rding via daijy) + +Release 0.5.0 INCOMPATIBLE CHANGES Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=881877&r1=881876&r2=881877&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Nov 18 18:58:34 2009 @@ -407,11 +407,16 @@ NoopFilterRemover fRem = new NoopFilterRemover(plan); fRem.visit(); - // reduces the number of MROpers in the MR plan generated - // by multi-query (multi-store) script. - MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan); - mqOptimizer.visit(); - + boolean isMultiQuery = + "true".equalsIgnoreCase(pc.getProperties().getProperty("opt.multiquery","true")); + + if (isMultiQuery) { + // reduces the number of MROpers in the MR plan generated + // by multi-query (multi-store) script. + MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan); + mqOptimizer.visit(); + } + // removes unnecessary stores (as can happen with splits in // some cases.). This has to run after the MultiQuery and // NoopFilterRemover. Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=881877&r1=881876&r2=881877&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Wed Nov 18 18:58:34 2009 @@ -621,58 +621,7 @@ // in inner plans of any POSplit operators return curIndex; } - - private int setBaseIndexOnDemux(int initial, PODemux demuxOp) - throws VisitorException { - int index = initial; - demuxOp.setBaseIndex(index++); - - List<PhysicalPlan> pls = demuxOp.getPlans(); - for (PhysicalPlan pl : pls) { - PhysicalOperator leaf = pl.getLeaves().get(0); - if (leaf instanceof POLocalRearrange) { - POLocalRearrange lr = (POLocalRearrange)leaf; - try { - // if the baseindex is set on the demux, then - // POLocalRearranges in its inner plan should really - // be sending an index out by adding the base index - // This is because we would be replicating the demux - // as many times as there are inner plans in the demux - // hence the index coming out of POLocalRearranges - // needs to be adjusted accordingly - lr.setMultiQueryIndex(initial + lr.getIndex()); - } catch (ExecException e) { - int errCode = 2136; - String msg = "Internal Error. Unable to set multi-query index for optimization."; - throw new OptimizerException(msg, errCode, PigException.BUG, e); - } - } - PhysicalOperator root = pl.getRoots().get(0); - if (root instanceof PODemux) { - index = setBaseIndexOnDemux(index, (PODemux)root); - } else { - index++; - } - } - return index; - } - - private int setBaseIndexOnPackage(int initial, POMultiQueryPackage pkgOp) { - int index = initial; - pkgOp.setBaseIndex(index++); - - List<POPackage> pkgs = pkgOp.getPackages(); - for (POPackage pkg : pkgs) { - if (pkg instanceof POMultiQueryPackage) { - POMultiQueryPackage mpkg = (POMultiQueryPackage)pkg; - index = setBaseIndexOnPackage(index, mpkg); - } else { - index++; - } - } - return index; - } - + private void mergeOneReducePlanWithIndex(PhysicalPlan from, PhysicalPlan to, int initial, int current, byte mapKeyType) throws VisitorException { POPackage pk = (POPackage)from.getRoots().get(0); @@ -684,41 +633,55 @@ // with the new indexed key addShiftedKeyInfoIndex(initial, pk); } + + int total = current - initial; + POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0); + int pkCount = 0; if (pk instanceof POMultiQueryPackage) { - POMultiQueryPackage mpkg = (POMultiQueryPackage)pk; - setBaseIndexOnPackage(initial, mpkg); - // we should update the keyinfo map of the - // POPackage objects in the POMultiQueryPackage to - // have the shifted index - The index now will be - // starting from "initial" going up to "current" - // ORed with the multi query bit mask - int retIndex = addShiftedKeyInfoIndex(initial, current, mpkg); - if(retIndex != current) { - int errCode = 2146; - String msg = "Internal Error. Inconsistency in key index found during optimization."; - throw new OptimizerException(msg, errCode, PigException.BUG); + List<POPackage> pkgs = ((POMultiQueryPackage)pk).getPackages(); + for (POPackage p : pkgs) { + pkg.addPackage(p); + pkCount++; } - } - - PhysicalOperator root = from.getRoots().get(0); - if (root instanceof PODemux) { - PODemux demux = (PODemux)root; - setBaseIndexOnDemux(initial, demux); - } - - POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0); - for (int i=initial; i<current; i++) { + addShiftedKeyInfoIndex(initial, current, (POMultiQueryPackage)pk); + } else { pkg.addPackage(pk); + pkCount = 1; } + if (pkCount != total) { + int errCode = 2146; + String msg = "Internal Error. Inconsistency in key index found during optimization."; + throw new OptimizerException(msg, errCode, PigException.BUG); + } + boolean[] keyPos = pk.getKeyPositionsInTuple(); PODemux demux = (PODemux)to.getLeaves().get(0); - for (int i=initial; i<current; i++) { + int plCount = 0; + PhysicalOperator root = from.getRoots().get(0); + if (root instanceof PODemux) { + // flattening the inner plans of the demux operator. + // This is based on the fact that if a plan has a demux + // operator, then it's the only operator in the plan. + List<PhysicalPlan> pls = ((PODemux)root).getPlans(); + for (PhysicalPlan pl : pls) { + demux.addPlan(pl, keyPos); + plCount++; + } + demux.addIsKeyWrappedList(((PODemux)root).getIsKeyWrappedList()); + } else { demux.addPlan(from, mapKeyType, keyPos); + plCount = 1; + } + + if (plCount != total) { + int errCode = 2146; + String msg = "Internal Error. Inconsistency in key index found during optimization."; + throw new OptimizerException(msg, errCode, PigException.BUG); } - + if (demux.isSameMapKeyType()) { pkg.setKeyType(pk.getKeyType()); } else { @@ -772,11 +735,7 @@ */ private int addShiftedKeyInfoIndex(int initialIndex, int onePastEndIndex, POMultiQueryPackage mpkg) throws OptimizerException { - // recursively iterate over the packages in the - // POMultiQueryPackage adding a shifted keyInfoIndex entry - // in the packages in order going from initialIndex upto - // onePastEndIndex (exclusive) flattening out any nested - // packages in nested POMultiqueryPackages as we traverse + List<POPackage> pkgs = mpkg.getPackages(); // if we have lesser pkgs than (onePastEndIndex - initialIndex) // its because one or more of the pkgs is a POMultiQueryPackage which @@ -794,12 +753,8 @@ int curIndex = initialIndex; while (i < end) { POPackage pkg = pkgs.get(i); - if(pkg instanceof POMultiQueryPackage) { - curIndex = addShiftedKeyInfoIndex(curIndex, onePastEndIndex, (POMultiQueryPackage)pkg); - } else { - addShiftedKeyInfoIndex(curIndex, pkg); - curIndex++; - } + addShiftedKeyInfoIndex(curIndex, pkg); + curIndex++; i++; } return curIndex; // could be used in a caller who recursively called this function @@ -810,37 +765,11 @@ PhysicalPlan to, int initial, int current, byte mapKeyType) throws VisitorException { POPackage cpk = (POPackage)from.getRoots().get(0); from.remove(cpk); - - if (cpk instanceof POMultiQueryPackage) { - POMultiQueryPackage mpkg = (POMultiQueryPackage)cpk; - setBaseIndexOnPackage(initial, mpkg); - } PODemux demux = (PODemux)to.getLeaves().get(0); boolean isSameKeyType = demux.isSameMapKeyType(); - PhysicalOperator leaf = from.getLeaves().get(0); - if (leaf instanceof POLocalRearrange) { - POLocalRearrange clr = (POLocalRearrange)leaf; - try { - clr.setMultiQueryIndex(initial); - } catch (ExecException e) { - int errCode = 2136; - String msg = "Internal Error. Unable to set multi-query index for optimization."; - throw new OptimizerException(msg, errCode, PigException.BUG, e); - } - - // change the map key type to tuple when - // multiple splittees have different map key types - if (!isSameKeyType) { - clr.setKeyType(DataType.TUPLE); - } - } else if (leaf instanceof PODemux) { - PODemux locDemux = (PODemux)leaf; - setBaseIndexOnDemux(initial, locDemux); - } - POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0); // if current > initial + 1, it means we had @@ -848,17 +777,35 @@ // merge. In that case we would have changed the indices // of the POLocalRearranges in the split to be in the // range initial to current. To handle key, value pairs - // coming out of those POLocalRearranges, we replicate - // the Package as many times (in this case, the package - // would have to be a POMultiQueryPackage since we had - // a POSplit in the map). That Package would have a baseindex - // correctly set (in the beginning of this method) and would - // be able to handle the outputs from the different + // coming out of those POLocalRearranges, we add + // the Packages in the 'from' POMultiQueryPackage (in this case, + // it has to be a POMultiQueryPackage since we had + // a POSplit in the map) to the 'to' POMultiQueryPackage. + // These Packages would have correct positions in the package + // list and would be able to handle the outputs from the different // POLocalRearranges. - for (int i=initial; i<current; i++) { + int total = current - initial; + int pkCount = 0; + if (cpk instanceof POMultiQueryPackage) { + List<POPackage> pkgs = ((POMultiQueryPackage)cpk).getPackages(); + for (POPackage p : pkgs) { + pkg.addPackage(p); + if (!isSameKeyType) { + p.setKeyType(DataType.TUPLE); + } + pkCount++; + } + } else { pkg.addPackage(cpk); + pkCount = 1; } - + + if (pkCount != total) { + int errCode = 2146; + String msg = "Internal Error. Inconsistency in key index found during optimization."; + throw new OptimizerException(msg, errCode, PigException.BUG); + } + // all packages should have the same key type if (!isSameKeyType) { cpk.setKeyType(DataType.TUPLE); @@ -868,11 +815,52 @@ boolean[] keyPos = cpk.getKeyPositionsInTuple(); - // See comment above for why we replicated the Package - // in the from plan - for the same reason, we replicate - // the Demux operators now. - for (int i=initial; i<current; i++) { + // See comment above for why we flatten the Packages + // in the from plan - for the same reason, we flatten + // the inner plans of Demux operator now. + int plCount = 0; + PhysicalOperator leaf = from.getLeaves().get(0); + if (leaf instanceof PODemux) { + List<PhysicalPlan> pls = ((PODemux)leaf).getPlans(); + for (PhysicalPlan pl : pls) { + demux.addPlan(pl, mapKeyType, keyPos); + POLocalRearrange lr = (POLocalRearrange)pl.getLeaves().get(0); + try { + lr.setMultiQueryIndex(initial + plCount++); + } catch (ExecException e) { + int errCode = 2136; + String msg = "Internal Error. Unable to set multi-query index for optimization."; + throw new OptimizerException(msg, errCode, PigException.BUG, e); + } + + // change the map key type to tuple when + // multiple splittees have different map key types + if (!isSameKeyType) { + lr.setKeyType(DataType.TUPLE); + } + } + } else { demux.addPlan(from, mapKeyType, keyPos); + POLocalRearrange lr = (POLocalRearrange)from.getLeaves().get(0); + try { + lr.setMultiQueryIndex(initial + plCount++); + } catch (ExecException e) { + int errCode = 2136; + String msg = "Internal Error. Unable to set multi-query index for optimization."; + throw new OptimizerException(msg, errCode, PigException.BUG, e); + } + + // change the map key type to tuple when + // multiple splittees have different map key types + if (!isSameKeyType) { + lr.setKeyType(DataType.TUPLE); + } + } + + if (plCount != total) { + int errCode = 2146; + String msg = "Internal Error. Inconsistency in key index found during optimization."; + throw new OptimizerException(msg, errCode, PigException.BUG); } } @@ -933,73 +921,7 @@ // > index + 1 int incIndex = mergeOneMapPlanWithIndex( mrOp.mapPlan, splitOp, index, sameKeyType); - - // In the combine and reduce plans the Demux and POMultiQueryPackage - // operators' baseIndex is set whenever the incIndex above is > index + 1 - // What does this 'baseIndex' mean - here is an attempt to explain it: - // Consider a map - reduce plan layout as shown below (the comments - // apply even if a combine plan was present) - An explanation of the "index" - // and "baseIndex" fields follows: - // The numbers in parenthesis () are "index" values - Note that in multiquery - // optimizations these indices are actually ORed with a bitmask (0x80) on the - // map side in the LocalRearrange. The POMultiQueryPackage and PODemux operators - // then remove this bitmask to restore the original index values. In the commentary - // below, indices will be referred to without this bitmask - the bitmask is only - // to identify the multiquery optimization during comparsion - for details see the comments - // in POLocalRearrange.setIndex(). - // The numbers in brackets [] are "baseIndex" values. These baseIndex values are - // used by POMultiQueryPackage and PODemux to calculate the an arrayList index which - // they use to pick the right package or inner plan respectively. All this is needed - // since on the map side the indices are assigned after flattening all POLocalRearranges - // including those nested in Splits into one flat space (as can be noticed by the - // numbering of the indices under the split in the example below). The optimizer then - // duplicates the POMultiQueryPackage and Demux inner plan corresponding to the split - // in the reduce plan (the entities with * in the figure below). Now if a key with index '1' - // is emitted, it goes to the first POMultiQueryPackage in the reduce plan below, which - // then picks the second package in its arraylist of packages which is a - // POMultiQueryPackage (the first with a * below). This POMultiQueryPackage then picks - // the first package in its list (it arrives at this arraylist index of 0 by subtracting - // the baseIndex (1) from the index coming in (1)). So the record emitted by LocalRearrange(1) - // reaches the right package. Likewise, if LocalRearrange(2) emits a key,value they would have - // an index 2. The first POMultiQueryPackage (with baseIndex 0 below) would pick the 3rd package - // (arraylist index 2 arrived at by doing index - baseIndex which is 2 - 0). This is the - // duplicated POMultiQueryPackage with baseIndex 1. This would inturn pick the second package - // in its arraylist (arraylist index 1 arrived at by doing index - baseIndex which is 2 - 1) - // The idea is that through duplication we make it easier to determine which POPackage to pick. - // The exact same logic is used by PODemux to pick the right inner plan from its arraylist of - // inner plans. - - // The arrows in the figure below show the correspondence between the different operators - // and plans .i.e the end of an arrow points to the operator or plan which consumes - // data coming from the root of the arrow - - // If you look at the layout below column-wise, each "column" is a MROper - // which is being merged into the parent MROper - the Split represents a - // MROper which inside of it has 2 MROpers merged in. - // A star (*) next to an operator means it is the same object as the - // other operator with a star(*) - Essentially the same object reference - // is copied twice. - // LocalRearrange(0) Split LocalRearrange(3) - // | / \ | - // | LocalRearrange(1) LocalRearrange(2) | - // | | MAP PLAN | | - // ----|-------------|---------------------------|-------------------|-------------------- - // | | REDUCE PLAN | | - // | | POMultiQueryPackage[0] | | - // V V | contains V V - // [ POPackage, POMultiQueryPackage[1]*,POMultiQueryPackage[1]*, POPackage] - // | / \ / \ | - // | POPackage POPackage POPackage POPackage | - // | \ | | - // | \ Demux[0] | | - // V V | contains V V - // [ planOfRedOperators,planWithDemux*, planWithDemux*, planOfRedOperators] - // / | | \ - // / Demux[1] Demux[1] \ - // V | | V - // [planOfRedOps,planOfRedOps][planOfRedOps,planOfRedOps] - // - + // merge the combiner plan if (comPl != null) { if (!mrOp.combinePlan.isEmpty()) { @@ -1011,6 +933,7 @@ throw new OptimizerException(msg, errCode, PigException.BUG); } } + // merge the reducer plan mergeOneReducePlanWithIndex( mrOp.reducePlan, redPl, index, incIndex, mrOp.mapKeyType); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=881877&r1=881876&r2=881877&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Wed Nov 18 18:58:34 2009 @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.BitSet; +import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; @@ -61,14 +62,7 @@ private static Result eop = new Result(POStatus.STATUS_EOP, null); transient private Log log = LogFactory.getLog(getClass()); - - /* - * The base index of this demux. In the case of - * a demux contained in another demux, the index - * passed in must be shifted before it can be used. - */ - private int baseIndex = 0; - + /* * The list of sub-plans the inner plan is composed of */ @@ -178,7 +172,7 @@ @Override public String name() { - return "Demux" + isKeyWrapped + "[" + baseIndex +"] - " + mKey.toString(); + return "Demux" + isKeyWrapped + " - " + mKey.toString(); } @Override @@ -190,32 +184,35 @@ public boolean supportsMultipleOutputs() { return false; } - + /** - * Sets the base index of this demux. - * - * @param idx the base index + * Returns the list of inner plans. + * + * @return the list of the nested plans */ - public void setBaseIndex(int idx) { - baseIndex = idx; + public List<PhysicalPlan> getPlans() { + return myPlans; } /** - * Returns the base index of this demux + * Returns the list of booleans that indicates if the + * key needs to unwrapped for the corresponding plan. * - * @return the base index + * @return the list of isKeyWrapped boolean values */ - public int getBaseIndex() { - return baseIndex; + public List<Boolean> getIsKeyWrappedList() { + return Collections.unmodifiableList(isKeyWrapped); } /** - * Returns the list of inner plans. - * - * @return the list of the nested plans + * Adds a list of IsKeyWrapped boolean values + * + * @param lst the list of boolean values to add */ - public List<PhysicalPlan> getPlans() { - return myPlans; + public void addIsKeyWrappedList(List<Boolean> lst) { + for (Boolean b : lst) { + isKeyWrapped.add(b); + } } /** @@ -232,6 +229,12 @@ isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true); keyPositions.add(keyPos); } + + public void addPlan(PhysicalPlan inPlan, boolean[] keyPos) { + myPlans.add(inPlan); + processedSet.set(myPlans.size()-1); + keyPositions.add(keyPos); + } @Override public Result getNext(Tuple t) throws ExecException { @@ -357,8 +360,7 @@ // the POLocalRearrange operator and passed to this operator // by POMultiQueryPackage int index = fld.getIndex(); - index &= idxPart; - index -= baseIndex; + index &= idxPart; PhysicalPlan pl = myPlans.get(index); if (!(pl.getRoots().get(0) instanceof PODemux)) { Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=881877&r1=881876&r2=881877&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Wed Nov 18 18:58:34 2009 @@ -64,8 +64,6 @@ private List<POPackage> packages = new ArrayList<POPackage>(); transient private PigNullableWritable myKey; - - private int baseIndex = 0; /** * Constructs an operator with the specified key. @@ -111,7 +109,7 @@ @Override public String name() { - return "MultiQuery Package[" + baseIndex +"] - " + getOperatorKey().toString(); + return "MultiQuery Package - " + getOperatorKey().toString(); } @Override @@ -174,7 +172,6 @@ int index = (int)origIndex; index &= idxPart; - index -= baseIndex; if (index >= packages.size() || index < 0) { int errCode = 2140; @@ -221,21 +218,4 @@ return res; } - /** - * Sets the base index of this operator - * - * @param baseIndex the base index of this operator - */ - public void setBaseIndex(int baseIndex) { - this.baseIndex = baseIndex; - } - - /** - * Returns the base index of this operator - * - * @return the base index of this operator - */ - public int getBaseIndex() { - return baseIndex; - } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=881877&r1=881876&r2=881877&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Wed Nov 18 18:58:34 2009 @@ -34,22 +34,28 @@ import junit.framework.Assert; import junit.framework.TestCase; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.util.Progressable; import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.PigServer; import org.apache.pig.StoreConfig; import org.apache.pig.StoreFunc; -import org.apache.pig.backend.executionengine.util.ExecTools; import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.backend.executionengine.util.ExecTools; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; -import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; @@ -57,20 +63,12 @@ import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.plan.Operator; import org.apache.pig.impl.plan.OperatorPlan; -import org.apache.pig.tools.grunt.GruntParser; import org.apache.pig.impl.util.LogUtils; +import org.apache.pig.tools.grunt.GruntParser; import org.apache.pig.tools.pigscript.parser.ParseException; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.jobcontrol.Job; -import org.apache.hadoop.util.Progressable; public class TestMultiQuery extends TestCase { @@ -91,6 +89,137 @@ } @Test + public void testMultiQueryJiraPig1060() { + + // test case: + + String INPUT_FILE = "pig-1060.txt"; + + try { + + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("apple\t2"); + w.println("apple\t12"); + w.println("orange\t3"); + w.println("orange\t23"); + w.println("strawberry\t10"); + w.println("strawberry\t34"); + + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + + myPig.setBatchOn(); + + myPig.registerQuery("data = load '" + INPUT_FILE + + "' as (name:chararray, gid:int);"); + myPig.registerQuery("f1 = filter data by gid < 5;"); + myPig.registerQuery("g1 = group f1 by name;"); + myPig.registerQuery("p1 = foreach g1 generate group, COUNT(f1.gid);"); + myPig.registerQuery("store p1 into '/tmp/output1';"); + + myPig.registerQuery("f2 = filter data by gid > 5;"); + myPig.registerQuery("g2 = group f2 by name;"); + myPig.registerQuery("p2 = foreach g2 generate group, COUNT(f2.gid);"); + myPig.registerQuery("store p2 into '/tmp/output2';"); + + myPig.registerQuery("f3 = filter f2 by gid > 10;"); + myPig.registerQuery("g3 = group f3 by name;"); + myPig.registerQuery("p3 = foreach g3 generate group, COUNT(f3.gid);"); + myPig.registerQuery("store p3 into '/tmp/output3';"); + + myPig.registerQuery("f4 = filter f3 by gid < 20;"); + myPig.registerQuery("g4 = group f4 by name;"); + myPig.registerQuery("p4 = foreach g4 generate group, COUNT(f4.gid);"); + myPig.registerQuery("store p4 into '/tmp/output4';"); + + LogicalPlan lp = checkLogicalPlan(1, 4, 27); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 4, 35); + + checkMRPlan(pp, 1, 1, 1); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + new File(INPUT_FILE).delete(); + try { + Util.deleteFile(cluster, INPUT_FILE); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + } + + @Test + public void testMultiQueryJiraPig1060_2() { + + // test case: + + String INPUT_FILE = "pig-1060.txt"; + + try { + + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("apple\t2"); + w.println("apple\t12"); + w.println("orange\t3"); + w.println("orange\t23"); + w.println("strawberry\t10"); + w.println("strawberry\t34"); + + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + + myPig.setBatchOn(); + + myPig.registerQuery("data = load '" + INPUT_FILE + + "' as (name:chararray, gid:int);"); + myPig.registerQuery("f1 = filter data by gid < 5;"); + myPig.registerQuery("g1 = group f1 by name;"); + myPig.registerQuery("p1 = foreach g1 generate group, COUNT(f1.gid);"); + myPig.registerQuery("store p1 into '/tmp/output1';"); + + myPig.registerQuery("f2 = filter data by gid > 5;"); + myPig.registerQuery("g2 = group f2 by name;"); + myPig.registerQuery("p2 = foreach g2 generate group, COUNT(f2.gid);"); + myPig.registerQuery("store p2 into '/tmp/output2';"); + + myPig.registerQuery("f3 = filter f2 by gid > 10;"); + myPig.registerQuery("g3 = group f3 by name;"); + myPig.registerQuery("p3 = foreach g3 generate group, COUNT(f3.gid);"); + myPig.registerQuery("store p3 into '/tmp/output3';"); + + myPig.registerQuery("f4 = filter f3 by gid < 20;"); + myPig.registerQuery("g4 = group f4 by name;"); + myPig.registerQuery("p4 = foreach g4 generate group, COUNT(f4.gid);"); + myPig.registerQuery("store p4 into '/tmp/output4';"); + + List<ExecJob> jobs = myPig.executeBatch(); + assertEquals(4, jobs.size()); + + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + new File(INPUT_FILE).delete(); + try { + Util.deleteFile(cluster, INPUT_FILE); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + } + + @Test public void testMultiQueryJiraPig920() { // test case: a simple diamond query @@ -485,6 +614,10 @@ List<ExecJob> jobs = myPig.executeBatch(); assertTrue(jobs.size() == 2); + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } + } catch (Exception e) { e.printStackTrace(); Assert.fail(); @@ -607,7 +740,11 @@ myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);"); myPig.registerQuery("store d2 into '/tmp/output3';"); - myPig.executeBatch(); + List<ExecJob> jobs = myPig.executeBatch(); + + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } } catch (Exception e) { e.printStackTrace(); @@ -740,8 +877,12 @@ myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);"); myPig.registerQuery("store d2 into '/tmp/output3';"); - myPig.executeBatch(); - + List<ExecJob> jobs = myPig.executeBatch(); + + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } + } catch (Exception e) { e.printStackTrace(); Assert.fail(); @@ -808,7 +949,12 @@ myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);"); myPig.registerQuery("store d2 into '/tmp/output3';"); - myPig.executeBatch(); + List<ExecJob> jobs = myPig.executeBatch(); + assertEquals(3, jobs.size()); + + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } } catch (Exception e) { e.printStackTrace(); @@ -876,7 +1022,12 @@ myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);"); myPig.registerQuery("store d2 into '/tmp/output3';"); - myPig.executeBatch(); + List<ExecJob> jobs = myPig.executeBatch(); + assertEquals(3, jobs.size()); + + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } } catch (Exception e) { e.printStackTrace(); @@ -989,7 +1140,12 @@ myPig.registerQuery("H = foreach G generate group, COUNT(A1);"); myPig.registerQuery("store H into '/tmp/output3';"); - myPig.executeBatch(); + List<ExecJob> jobs = myPig.executeBatch(); + assertEquals(3, jobs.size()); + + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } } catch (Exception e) { e.printStackTrace(); @@ -1055,7 +1211,12 @@ myPig.registerQuery("g1 = foreach g generate group, COUNT(d2);"); myPig.registerQuery("store g1 into '/tmp/output3';"); - myPig.executeBatch(); + List<ExecJob> jobs = myPig.executeBatch(); + assertEquals(3, jobs.size()); + + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } } catch (Exception e) { e.printStackTrace(); @@ -1111,7 +1272,12 @@ myPig.registerQuery("e = foreach d generate flatten(b), flatten(c);"); myPig.registerQuery("store e into '/tmp/output2';"); - myPig.executeBatch(); + List<ExecJob> jobs = myPig.executeBatch(); + assertTrue(jobs.size() == 2); + + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } } catch (Exception e) { e.printStackTrace(); @@ -1169,7 +1335,12 @@ myPig.registerQuery("e = join c by gid, d by gid using \"repl\";"); myPig.registerQuery("store e into '/tmp/output3';"); - myPig.executeBatch(); + List<ExecJob> jobs = myPig.executeBatch(); + assertEquals(3, jobs.size()); + + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } } catch (Exception e) { e.printStackTrace(); @@ -1278,7 +1449,12 @@ myPig.registerQuery("b = load '/tmp/output1' using PigStorage(':'); "); myPig.registerQuery("store b into '/tmp/output2';"); - myPig.executeBatch(); + List<ExecJob> jobs = myPig.executeBatch(); + assertTrue(jobs.size() == 2); + + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } } catch (Exception e) { e.printStackTrace(); @@ -1596,7 +1772,7 @@ } } - + @Test public void testMultiQueryWithTwoStores() { @@ -2067,7 +2243,7 @@ Assert.fail(); } } - + /** * Test that pig calls checkOutputSpecs() method of the OutputFormat (if the * StoreFunc defines an OutputFormat as the return value of @@ -2314,6 +2490,9 @@ showPlanOperators(mrp); + System.out.println("===== Display map-reduce Plan ====="); + System.out.println(mrp.toString()); + Assert.assertEquals(expectedRoots, mrp.getRoots().size()); Assert.assertEquals(expectedLeaves, mrp.getLeaves().size()); Assert.assertEquals(expectedSize, mrp.size());