Author: daijy
Date: Wed Nov 18 19:01:21 2009
New Revision: 881879
URL: http://svn.apache.org/viewvc?rev=881879&view=rev
Log:
PIG-1060: MultiQuery optimization throws error for multi-level splits
Modified:
hadoop/pig/branches/branch-0.6/CHANGES.txt
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Wed Nov 18 19:01:21 2009
@@ -184,7 +184,9 @@
PIG-1001: Generate more meaningful error message when one input file does not
exist (daijy)
-Release 0.5.0 - Unreleased
+PIG-1060: MultiQuery optimization throws error for multi-level splits (rding
via daijy)
+
+Release 0.5.0
INCOMPATIBLE CHANGES
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Wed Nov 18 19:01:21 2009
@@ -407,11 +407,16 @@
NoopFilterRemover fRem = new NoopFilterRemover(plan);
fRem.visit();
- // reduces the number of MROpers in the MR plan generated
- // by multi-query (multi-store) script.
- MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
- mqOptimizer.visit();
-
+ boolean isMultiQuery =
+
"true".equalsIgnoreCase(pc.getProperties().getProperty("opt.multiquery","true"));
+
+ if (isMultiQuery) {
+ // reduces the number of MROpers in the MR plan generated
+ // by multi-query (multi-store) script.
+ MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
+ mqOptimizer.visit();
+ }
+
// removes unnecessary stores (as can happen with splits in
// some cases.). This has to run after the MultiQuery and
// NoopFilterRemover.
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
Wed Nov 18 19:01:21 2009
@@ -621,58 +621,7 @@
// in inner plans of any POSplit operators
return curIndex;
}
-
- private int setBaseIndexOnDemux(int initial, PODemux demuxOp)
- throws VisitorException {
- int index = initial;
- demuxOp.setBaseIndex(index++);
-
- List<PhysicalPlan> pls = demuxOp.getPlans();
- for (PhysicalPlan pl : pls) {
- PhysicalOperator leaf = pl.getLeaves().get(0);
- if (leaf instanceof POLocalRearrange) {
- POLocalRearrange lr = (POLocalRearrange)leaf;
- try {
- // if the baseindex is set on the demux, then
- // POLocalRearranges in its inner plan should really
- // be sending an index out by adding the base index
- // This is because we would be replicating the demux
- // as many times as there are inner plans in the demux
- // hence the index coming out of POLocalRearranges
- // needs to be adjusted accordingly
- lr.setMultiQueryIndex(initial + lr.getIndex());
- } catch (ExecException e) {
- int errCode = 2136;
- String msg = "Internal Error. Unable to set multi-query
index for optimization.";
- throw new OptimizerException(msg, errCode,
PigException.BUG, e);
- }
- }
- PhysicalOperator root = pl.getRoots().get(0);
- if (root instanceof PODemux) {
- index = setBaseIndexOnDemux(index, (PODemux)root);
- } else {
- index++;
- }
- }
- return index;
- }
-
- private int setBaseIndexOnPackage(int initial, POMultiQueryPackage pkgOp) {
- int index = initial;
- pkgOp.setBaseIndex(index++);
-
- List<POPackage> pkgs = pkgOp.getPackages();
- for (POPackage pkg : pkgs) {
- if (pkg instanceof POMultiQueryPackage) {
- POMultiQueryPackage mpkg = (POMultiQueryPackage)pkg;
- index = setBaseIndexOnPackage(index, mpkg);
- } else {
- index++;
- }
- }
- return index;
- }
-
+
private void mergeOneReducePlanWithIndex(PhysicalPlan from,
PhysicalPlan to, int initial, int current, byte mapKeyType) throws
VisitorException {
POPackage pk = (POPackage)from.getRoots().get(0);
@@ -684,41 +633,55 @@
// with the new indexed key
addShiftedKeyInfoIndex(initial, pk);
}
+
+ int total = current - initial;
+ POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
+ int pkCount = 0;
if (pk instanceof POMultiQueryPackage) {
- POMultiQueryPackage mpkg = (POMultiQueryPackage)pk;
- setBaseIndexOnPackage(initial, mpkg);
- // we should update the keyinfo map of the
- // POPackage objects in the POMultiQueryPackage to
- // have the shifted index - The index now will be
- // starting from "initial" going up to "current"
- // ORed with the multi query bit mask
- int retIndex = addShiftedKeyInfoIndex(initial, current, mpkg);
- if(retIndex != current) {
- int errCode = 2146;
- String msg = "Internal Error. Inconsistency in key index found
during optimization.";
- throw new OptimizerException(msg, errCode, PigException.BUG);
+ List<POPackage> pkgs = ((POMultiQueryPackage)pk).getPackages();
+ for (POPackage p : pkgs) {
+ pkg.addPackage(p);
+ pkCount++;
}
- }
-
- PhysicalOperator root = from.getRoots().get(0);
- if (root instanceof PODemux) {
- PODemux demux = (PODemux)root;
- setBaseIndexOnDemux(initial, demux);
- }
-
- POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
- for (int i=initial; i<current; i++) {
+ addShiftedKeyInfoIndex(initial, current, (POMultiQueryPackage)pk);
+ } else {
pkg.addPackage(pk);
+ pkCount = 1;
}
+ if (pkCount != total) {
+ int errCode = 2146;
+ String msg = "Internal Error. Inconsistency in key index found
during optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+ }
+
boolean[] keyPos = pk.getKeyPositionsInTuple();
PODemux demux = (PODemux)to.getLeaves().get(0);
- for (int i=initial; i<current; i++) {
+ int plCount = 0;
+ PhysicalOperator root = from.getRoots().get(0);
+ if (root instanceof PODemux) {
+ // flattening the inner plans of the demux operator.
+ // This is based on the fact that if a plan has a demux
+ // operator, then it's the only operator in the plan.
+ List<PhysicalPlan> pls = ((PODemux)root).getPlans();
+ for (PhysicalPlan pl : pls) {
+ demux.addPlan(pl, keyPos);
+ plCount++;
+ }
+ demux.addIsKeyWrappedList(((PODemux)root).getIsKeyWrappedList());
+ } else {
demux.addPlan(from, mapKeyType, keyPos);
+ plCount = 1;
+ }
+
+ if (plCount != total) {
+ int errCode = 2146;
+ String msg = "Internal Error. Inconsistency in key index found
during optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
}
-
+
if (demux.isSameMapKeyType()) {
pkg.setKeyType(pk.getKeyType());
} else {
@@ -772,11 +735,7 @@
*/
private int addShiftedKeyInfoIndex(int initialIndex, int onePastEndIndex,
POMultiQueryPackage mpkg) throws OptimizerException {
- // recursively iterate over the packages in the
- // POMultiQueryPackage adding a shifted keyInfoIndex entry
- // in the packages in order going from initialIndex upto
- // onePastEndIndex (exclusive) flattening out any nested
- // packages in nested POMultiqueryPackages as we traverse
+
List<POPackage> pkgs = mpkg.getPackages();
// if we have lesser pkgs than (onePastEndIndex - initialIndex)
// its because one or more of the pkgs is a POMultiQueryPackage which
@@ -794,12 +753,8 @@
int curIndex = initialIndex;
while (i < end) {
POPackage pkg = pkgs.get(i);
- if(pkg instanceof POMultiQueryPackage) {
- curIndex = addShiftedKeyInfoIndex(curIndex, onePastEndIndex,
(POMultiQueryPackage)pkg);
- } else {
- addShiftedKeyInfoIndex(curIndex, pkg);
- curIndex++;
- }
+ addShiftedKeyInfoIndex(curIndex, pkg);
+ curIndex++;
i++;
}
return curIndex; // could be used in a caller who recursively called
this function
@@ -810,37 +765,11 @@
PhysicalPlan to, int initial, int current, byte mapKeyType) throws
VisitorException {
POPackage cpk = (POPackage)from.getRoots().get(0);
from.remove(cpk);
-
- if (cpk instanceof POMultiQueryPackage) {
- POMultiQueryPackage mpkg = (POMultiQueryPackage)cpk;
- setBaseIndexOnPackage(initial, mpkg);
- }
PODemux demux = (PODemux)to.getLeaves().get(0);
boolean isSameKeyType = demux.isSameMapKeyType();
- PhysicalOperator leaf = from.getLeaves().get(0);
- if (leaf instanceof POLocalRearrange) {
- POLocalRearrange clr = (POLocalRearrange)leaf;
- try {
- clr.setMultiQueryIndex(initial);
- } catch (ExecException e) {
- int errCode = 2136;
- String msg = "Internal Error. Unable to set multi-query index
for optimization.";
- throw new OptimizerException(msg, errCode, PigException.BUG,
e);
- }
-
- // change the map key type to tuple when
- // multiple splittees have different map key types
- if (!isSameKeyType) {
- clr.setKeyType(DataType.TUPLE);
- }
- } else if (leaf instanceof PODemux) {
- PODemux locDemux = (PODemux)leaf;
- setBaseIndexOnDemux(initial, locDemux);
- }
-
POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
// if current > initial + 1, it means we had
@@ -848,17 +777,35 @@
// merge. In that case we would have changed the indices
// of the POLocalRearranges in the split to be in the
// range initial to current. To handle key, value pairs
- // coming out of those POLocalRearranges, we replicate
- // the Package as many times (in this case, the package
- // would have to be a POMultiQueryPackage since we had
- // a POSplit in the map). That Package would have a baseindex
- // correctly set (in the beginning of this method) and would
- // be able to handle the outputs from the different
+ // coming out of those POLocalRearranges, we add
+ // the Packages in the 'from' POMultiQueryPackage (in this case,
+ // it has to be a POMultiQueryPackage since we had
+ // a POSplit in the map) to the 'to' POMultiQueryPackage.
+ // These Packages would have correct positions in the package
+ // list and would be able to handle the outputs from the different
// POLocalRearranges.
- for (int i=initial; i<current; i++) {
+ int total = current - initial;
+ int pkCount = 0;
+ if (cpk instanceof POMultiQueryPackage) {
+ List<POPackage> pkgs = ((POMultiQueryPackage)cpk).getPackages();
+ for (POPackage p : pkgs) {
+ pkg.addPackage(p);
+ if (!isSameKeyType) {
+ p.setKeyType(DataType.TUPLE);
+ }
+ pkCount++;
+ }
+ } else {
pkg.addPackage(cpk);
+ pkCount = 1;
}
-
+
+ if (pkCount != total) {
+ int errCode = 2146;
+ String msg = "Internal Error. Inconsistency in key index found
during optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+ }
+
// all packages should have the same key type
if (!isSameKeyType) {
cpk.setKeyType(DataType.TUPLE);
@@ -868,11 +815,52 @@
boolean[] keyPos = cpk.getKeyPositionsInTuple();
- // See comment above for why we replicated the Package
- // in the from plan - for the same reason, we replicate
- // the Demux operators now.
- for (int i=initial; i<current; i++) {
+ // See comment above for why we flatten the Packages
+ // in the from plan - for the same reason, we flatten
+ // the inner plans of Demux operator now.
+ int plCount = 0;
+ PhysicalOperator leaf = from.getLeaves().get(0);
+ if (leaf instanceof PODemux) {
+ List<PhysicalPlan> pls = ((PODemux)leaf).getPlans();
+ for (PhysicalPlan pl : pls) {
+ demux.addPlan(pl, mapKeyType, keyPos);
+ POLocalRearrange lr = (POLocalRearrange)pl.getLeaves().get(0);
+ try {
+ lr.setMultiQueryIndex(initial + plCount++);
+ } catch (ExecException e) {
+ int errCode = 2136;
+ String msg = "Internal Error. Unable to set multi-query
index for optimization.";
+ throw new OptimizerException(msg, errCode,
PigException.BUG, e);
+ }
+
+ // change the map key type to tuple when
+ // multiple splittees have different map key types
+ if (!isSameKeyType) {
+ lr.setKeyType(DataType.TUPLE);
+ }
+ }
+ } else {
demux.addPlan(from, mapKeyType, keyPos);
+ POLocalRearrange lr = (POLocalRearrange)from.getLeaves().get(0);
+ try {
+ lr.setMultiQueryIndex(initial + plCount++);
+ } catch (ExecException e) {
+ int errCode = 2136;
+ String msg = "Internal Error. Unable to set multi-query index
for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG,
e);
+ }
+
+ // change the map key type to tuple when
+ // multiple splittees have different map key types
+ if (!isSameKeyType) {
+ lr.setKeyType(DataType.TUPLE);
+ }
+ }
+
+ if (plCount != total) {
+ int errCode = 2146;
+ String msg = "Internal Error. Inconsistency in key index found
during optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
}
}
@@ -933,73 +921,7 @@
// > index + 1
int incIndex = mergeOneMapPlanWithIndex(
mrOp.mapPlan, splitOp, index, sameKeyType);
-
- // In the combine and reduce plans the Demux and
POMultiQueryPackage
- // operators' baseIndex is set whenever the incIndex above is >
index + 1
- // What does this 'baseIndex' mean - here is an attempt to explain
it:
- // Consider a map - reduce plan layout as shown below (the comments
- // apply even if a combine plan was present) - An explanation of
the "index"
- // and "baseIndex" fields follows:
- // The numbers in parenthesis () are "index" values - Note that in
multiquery
- // optimizations these indices are actually ORed with a bitmask
(0x80) on the
- // map side in the LocalRearrange. The POMultiQueryPackage and
PODemux operators
- // then remove this bitmask to restore the original index values.
In the commentary
- // below, indices will be referred to without this bitmask - the
bitmask is only
- // to identify the multiquery optimization during comparsion - for
details see the comments
- // in POLocalRearrange.setIndex().
- // The numbers in brackets [] are "baseIndex" values. These
baseIndex values are
- // used by POMultiQueryPackage and PODemux to calculate the an
arrayList index which
- // they use to pick the right package or inner plan respectively.
All this is needed
- // since on the map side the indices are assigned after flattening
all POLocalRearranges
- // including those nested in Splits into one flat space (as can be
noticed by the
- // numbering of the indices under the split in the example below).
The optimizer then
- // duplicates the POMultiQueryPackage and Demux inner plan
corresponding to the split
- // in the reduce plan (the entities with * in the figure below).
Now if a key with index '1'
- // is emitted, it goes to the first POMultiQueryPackage in the
reduce plan below, which
- // then picks the second package in its arraylist of packages
which is a
- // POMultiQueryPackage (the first with a * below). This
POMultiQueryPackage then picks
- // the first package in its list (it arrives at this arraylist
index of 0 by subtracting
- // the baseIndex (1) from the index coming in (1)). So the record
emitted by LocalRearrange(1)
- // reaches the right package. Likewise, if LocalRearrange(2) emits
a key,value they would have
- // an index 2. The first POMultiQueryPackage (with baseIndex 0
below) would pick the 3rd package
- // (arraylist index 2 arrived at by doing index - baseIndex which
is 2 - 0). This is the
- // duplicated POMultiQueryPackage with baseIndex 1. This would
inturn pick the second package
- // in its arraylist (arraylist index 1 arrived at by doing index -
baseIndex which is 2 - 1)
- // The idea is that through duplication we make it easier to
determine which POPackage to pick.
- // The exact same logic is used by PODemux to pick the right inner
plan from its arraylist of
- // inner plans.
-
- // The arrows in the figure below show the correspondence between
the different operators
- // and plans .i.e the end of an arrow points to the operator or
plan which consumes
- // data coming from the root of the arrow
-
- // If you look at the layout below column-wise, each "column" is a
MROper
- // which is being merged into the parent MROper - the Split
represents a
- // MROper which inside of it has 2 MROpers merged in.
- // A star (*) next to an operator means it is the same object as
the
- // other operator with a star(*) - Essentially the same object
reference
- // is copied twice.
- // LocalRearrange(0) Split
LocalRearrange(3)
- // | / \
|
- // | LocalRearrange(1) LocalRearrange(2)
|
- // | | MAP PLAN |
|
- //
----|-------------|---------------------------|-------------------|--------------------
- // | | REDUCE PLAN |
|
- // | | POMultiQueryPackage[0] |
|
- // V V | contains V
V
- // [ POPackage, POMultiQueryPackage[1]*,POMultiQueryPackage[1]*,
POPackage]
- // | / \ / \
|
- // | POPackage POPackage POPackage POPackage
|
- // | \ |
|
- // | \ Demux[0] |
|
- // V V | contains V
V
- // [ planOfRedOperators,planWithDemux*, planWithDemux*,
planOfRedOperators]
- // / | | \
- // / Demux[1] Demux[1] \
- // V | | V
- //
[planOfRedOps,planOfRedOps][planOfRedOps,planOfRedOps]
- //
-
+
// merge the combiner plan
if (comPl != null) {
if (!mrOp.combinePlan.isEmpty()) {
@@ -1011,6 +933,7 @@
throw new OptimizerException(msg, errCode,
PigException.BUG);
}
}
+
// merge the reducer plan
mergeOneReducePlanWithIndex(
mrOp.reducePlan, redPl, index, incIndex, mrOp.mapKeyType);
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
Wed Nov 18 19:01:21 2009
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -61,14 +62,7 @@
private static Result eop = new Result(POStatus.STATUS_EOP, null);
transient private Log log = LogFactory.getLog(getClass());
-
- /*
- * The base index of this demux. In the case of
- * a demux contained in another demux, the index
- * passed in must be shifted before it can be used.
- */
- private int baseIndex = 0;
-
+
/*
* The list of sub-plans the inner plan is composed of
*/
@@ -178,7 +172,7 @@
@Override
public String name() {
- return "Demux" + isKeyWrapped + "[" + baseIndex +"] - " +
mKey.toString();
+ return "Demux" + isKeyWrapped + " - " + mKey.toString();
}
@Override
@@ -190,32 +184,35 @@
public boolean supportsMultipleOutputs() {
return false;
}
-
+
/**
- * Sets the base index of this demux.
- *
- * @param idx the base index
+ * Returns the list of inner plans.
+ *
+ * @return the list of the nested plans
*/
- public void setBaseIndex(int idx) {
- baseIndex = idx;
+ public List<PhysicalPlan> getPlans() {
+ return myPlans;
}
/**
- * Returns the base index of this demux
+ * Returns the list of booleans that indicates if the
+ * key needs to unwrapped for the corresponding plan.
*
- * @return the base index
+ * @return the list of isKeyWrapped boolean values
*/
- public int getBaseIndex() {
- return baseIndex;
+ public List<Boolean> getIsKeyWrappedList() {
+ return Collections.unmodifiableList(isKeyWrapped);
}
/**
- * Returns the list of inner plans.
- *
- * @return the list of the nested plans
+ * Adds a list of IsKeyWrapped boolean values
+ *
+ * @param lst the list of boolean values to add
*/
- public List<PhysicalPlan> getPlans() {
- return myPlans;
+ public void addIsKeyWrappedList(List<Boolean> lst) {
+ for (Boolean b : lst) {
+ isKeyWrapped.add(b);
+ }
}
/**
@@ -232,6 +229,12 @@
isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
keyPositions.add(keyPos);
}
+
+ public void addPlan(PhysicalPlan inPlan, boolean[] keyPos) {
+ myPlans.add(inPlan);
+ processedSet.set(myPlans.size()-1);
+ keyPositions.add(keyPos);
+ }
@Override
public Result getNext(Tuple t) throws ExecException {
@@ -357,8 +360,7 @@
// the POLocalRearrange operator and passed to this operator
// by POMultiQueryPackage
int index = fld.getIndex();
- index &= idxPart;
- index -= baseIndex;
+ index &= idxPart;
PhysicalPlan pl = myPlans.get(index);
if (!(pl.getRoots().get(0) instanceof PODemux)) {
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
Wed Nov 18 19:01:21 2009
@@ -64,8 +64,6 @@
private List<POPackage> packages = new ArrayList<POPackage>();
transient private PigNullableWritable myKey;
-
- private int baseIndex = 0;
/**
* Constructs an operator with the specified key.
@@ -111,7 +109,7 @@
@Override
public String name() {
- return "MultiQuery Package[" + baseIndex +"] - " +
getOperatorKey().toString();
+ return "MultiQuery Package - " + getOperatorKey().toString();
}
@Override
@@ -174,7 +172,6 @@
int index = (int)origIndex;
index &= idxPart;
- index -= baseIndex;
if (index >= packages.size() || index < 0) {
int errCode = 2140;
@@ -221,21 +218,4 @@
return res;
}
- /**
- * Sets the base index of this operator
- *
- * @param baseIndex the base index of this operator
- */
- public void setBaseIndex(int baseIndex) {
- this.baseIndex = baseIndex;
- }
-
- /**
- * Returns the base index of this operator
- *
- * @return the base index of this operator
- */
- public int getBaseIndex() {
- return baseIndex;
- }
}
Modified:
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java
(original)
+++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java
Wed Nov 18 19:01:21 2009
@@ -34,22 +34,28 @@
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.StoreConfig;
import org.apache.pig.StoreFunc;
-import org.apache.pig.backend.executionengine.util.ExecTools;
import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.util.ExecTools;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -57,20 +63,12 @@
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorPlan;
-import org.apache.pig.tools.grunt.GruntParser;
import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.tools.grunt.GruntParser;
import org.apache.pig.tools.pigscript.parser.ParseException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.util.Progressable;
public class TestMultiQuery extends TestCase {
@@ -91,6 +89,137 @@
}
@Test
+ public void testMultiQueryJiraPig1060() {
+
+ // test case:
+
+ String INPUT_FILE = "pig-1060.txt";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("apple\t2");
+ w.println("apple\t12");
+ w.println("orange\t3");
+ w.println("orange\t23");
+ w.println("strawberry\t10");
+ w.println("strawberry\t34");
+
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("data = load '" + INPUT_FILE +
+ "' as (name:chararray, gid:int);");
+ myPig.registerQuery("f1 = filter data by gid < 5;");
+ myPig.registerQuery("g1 = group f1 by name;");
+ myPig.registerQuery("p1 = foreach g1 generate group,
COUNT(f1.gid);");
+ myPig.registerQuery("store p1 into '/tmp/output1';");
+
+ myPig.registerQuery("f2 = filter data by gid > 5;");
+ myPig.registerQuery("g2 = group f2 by name;");
+ myPig.registerQuery("p2 = foreach g2 generate group,
COUNT(f2.gid);");
+ myPig.registerQuery("store p2 into '/tmp/output2';");
+
+ myPig.registerQuery("f3 = filter f2 by gid > 10;");
+ myPig.registerQuery("g3 = group f3 by name;");
+ myPig.registerQuery("p3 = foreach g3 generate group,
COUNT(f3.gid);");
+ myPig.registerQuery("store p3 into '/tmp/output3';");
+
+ myPig.registerQuery("f4 = filter f3 by gid < 20;");
+ myPig.registerQuery("g4 = group f4 by name;");
+ myPig.registerQuery("p4 = foreach g4 generate group,
COUNT(f4.gid);");
+ myPig.registerQuery("store p4 into '/tmp/output4';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 4, 27);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 4, 35);
+
+ checkMRPlan(pp, 1, 1, 1);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig1060_2() {
+
+ // test case:
+
+ String INPUT_FILE = "pig-1060.txt";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("apple\t2");
+ w.println("apple\t12");
+ w.println("orange\t3");
+ w.println("orange\t23");
+ w.println("strawberry\t10");
+ w.println("strawberry\t34");
+
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("data = load '" + INPUT_FILE +
+ "' as (name:chararray, gid:int);");
+ myPig.registerQuery("f1 = filter data by gid < 5;");
+ myPig.registerQuery("g1 = group f1 by name;");
+ myPig.registerQuery("p1 = foreach g1 generate group,
COUNT(f1.gid);");
+ myPig.registerQuery("store p1 into '/tmp/output1';");
+
+ myPig.registerQuery("f2 = filter data by gid > 5;");
+ myPig.registerQuery("g2 = group f2 by name;");
+ myPig.registerQuery("p2 = foreach g2 generate group,
COUNT(f2.gid);");
+ myPig.registerQuery("store p2 into '/tmp/output2';");
+
+ myPig.registerQuery("f3 = filter f2 by gid > 10;");
+ myPig.registerQuery("g3 = group f3 by name;");
+ myPig.registerQuery("p3 = foreach g3 generate group,
COUNT(f3.gid);");
+ myPig.registerQuery("store p3 into '/tmp/output3';");
+
+ myPig.registerQuery("f4 = filter f3 by gid < 20;");
+ myPig.registerQuery("g4 = group f4 by name;");
+ myPig.registerQuery("p4 = foreach g4 generate group,
COUNT(f4.gid);");
+ myPig.registerQuery("store p4 into '/tmp/output4';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(4, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
public void testMultiQueryJiraPig920() {
// test case: a simple diamond query
@@ -485,6 +614,10 @@
List<ExecJob> jobs = myPig.executeBatch();
assertTrue(jobs.size() == 2);
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -607,7 +740,11 @@
myPig.registerQuery("d2 = foreach d1 generate group,
AVG(d.uid);");
myPig.registerQuery("store d2 into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -740,8 +877,12 @@
myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) -
MIN(d.uid);");
myPig.registerQuery("store d2 into '/tmp/output3';");
- myPig.executeBatch();
-
+ List<ExecJob> jobs = myPig.executeBatch();
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -808,7 +949,12 @@
myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) -
MIN(d.uid);");
myPig.registerQuery("store d2 into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(3, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -876,7 +1022,12 @@
myPig.registerQuery("d2 = foreach d1 generate group,
COUNT(d.uid);");
myPig.registerQuery("store d2 into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(3, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -989,7 +1140,12 @@
myPig.registerQuery("H = foreach G generate group, COUNT(A1);");
myPig.registerQuery("store H into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(3, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -1055,7 +1211,12 @@
myPig.registerQuery("g1 = foreach g generate group, COUNT(d2);");
myPig.registerQuery("store g1 into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(3, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -1111,7 +1272,12 @@
myPig.registerQuery("e = foreach d generate flatten(b),
flatten(c);");
myPig.registerQuery("store e into '/tmp/output2';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertTrue(jobs.size() == 2);
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -1169,7 +1335,12 @@
myPig.registerQuery("e = join c by gid, d by gid using \"repl\";");
myPig.registerQuery("store e into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(3, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -1278,7 +1449,12 @@
myPig.registerQuery("b = load '/tmp/output1' using
PigStorage(':'); ");
myPig.registerQuery("store b into '/tmp/output2';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertTrue(jobs.size() == 2);
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -1596,7 +1772,7 @@
}
}
-
+
@Test
public void testMultiQueryWithTwoStores() {
@@ -2067,7 +2243,7 @@
Assert.fail();
}
}
-
+
/**
* Test that pig calls checkOutputSpecs() method of the OutputFormat (if
the
* StoreFunc defines an OutputFormat as the return value of
@@ -2314,6 +2490,9 @@
showPlanOperators(mrp);
+ System.out.println("===== Display map-reduce Plan =====");
+ System.out.println(mrp.toString());
+
Assert.assertEquals(expectedRoots, mrp.getRoots().size());
Assert.assertEquals(expectedLeaves, mrp.getLeaves().size());
Assert.assertEquals(expectedSize, mrp.size());