Author: pradeepkth
Date: Tue Jun 9 22:11:11 2009
New Revision: 783153
URL: http://svn.apache.org/viewvc?rev=783153&view=rev
Log:
Multiquery optimization does not handle the case where the map keys in the
split plans have different key types (tuple and non tuple key type) (pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MROperPlan.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=783153&r1=783152&r2=783153&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jun 9 22:11:11 2009
@@ -28,6 +28,10 @@
BUG FIXES
+PIG-835: Multiquery optimization does not handle the case where the map keys
+in the split plans have different key types (tuple and non tuple key type)
+(pradeepkth)
+
Release 0.3.0 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=783153&r1=783152&r2=783153&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
Tue Jun 9 22:11:11 2009
@@ -35,6 +35,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.DataType;
+import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
@@ -420,12 +421,28 @@
lr.setKeyType(DataType.TUPLE);
}
} else if (leaf instanceof POSplit) {
+ // if the map plan that we are trying to merge
+ // has a split, we need to update the indices of
+ // the POLocalRearrange operators in the inner plans
+ // of the split to be a continuation of the index
+ // number sequence we are currently at.
+ // So for example, if we we are in the MapRedOper
+ // we are currently processing, if the index is currently
+ // at 1 (meaning index 0 was used for a map plan
+ // merged earlier), then we want the POLocalRearrange
+ // operators in the split to have indices 1, 2 ...
+ // essentially we are flattening the index numbers
+ // across all POLocalRearranges in all merged map plans
+ // including nested ones in POSplit
POSplit spl = (POSplit)leaf;
curIndex = setIndexOnLRInSplit(index, spl);
}
splitOp.addPlan(pl);
-
+
+ // return the updated index after setting index
+ // on all POLocalRearranges including ones
+ // in inner plans of any POSplit operators
return curIndex;
}
@@ -439,7 +456,14 @@
PhysicalOperator leaf = pl.getLeaves().get(0);
if (leaf instanceof POLocalRearrange) {
POLocalRearrange lr = (POLocalRearrange)leaf;
- try {
+ try {
+ // if the baseindex is set on the demux, then
+ // POLocalRearranges in its inner plan should really
+ // be sending an index out by adding the base index
+ // This is because we would be replicating the demux
+ // as many times as there are inner plans in the demux
+ // hence the index coming out of POLocalRearranges
+ // needs to be adjusted accordingly
lr.setMultiQueryIndex(initial + lr.getIndex());
} catch (ExecException e) {
int errCode = 2136;
@@ -474,7 +498,7 @@
}
private void mergeOneReducePlanWithIndex(PhysicalPlan from,
- PhysicalPlan to, int initial, int current) throws VisitorException
{
+ PhysicalPlan to, int initial, int current, byte mapKeyType) throws
VisitorException {
POPackage pk = (POPackage)from.getRoots().get(0);
from.remove(pk);
@@ -483,7 +507,7 @@
// with the new indexed key
Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo =
pk.getKeyInfo();
if (keyInfo != null && keyInfo.size() > 0) {
- byte b = (byte)(initial | 0x80);
+ byte b = (byte)(initial | PigNullableWritable.mqFlag);
keyInfo.put(new Integer(b), keyInfo.get(0));
}
@@ -505,7 +529,7 @@
PODemux demux = (PODemux)to.getLeaves().get(0);
for (int i=initial; i<current; i++) {
- demux.addPlan(from);
+ demux.addPlan(from, mapKeyType);
}
if (demux.isSameMapKeyType()) {
@@ -516,7 +540,7 @@
}
private void mergeOneCombinePlanWithIndex(PhysicalPlan from,
- PhysicalPlan to, int initial, int current) throws VisitorException
{
+ PhysicalPlan to, int initial, int current, byte mapKeyType) throws
VisitorException {
POPackage cpk = (POPackage)from.getRoots().get(0);
from.remove(cpk);
@@ -550,7 +574,20 @@
setBaseIndexOnDemux(initial, locDemux);
}
- POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
+ POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
+
+ // if current > initial + 1, it means we had
+ // a split in the map of the MROper we are trying to
+ // merge. In that case we would have changed the indices
+ // of the POLocalRearranges in the split to be in the
+ // range initial to current. To handle key, value pairs
+ // coming out of those POLocalRearranges, we replicate
+ // the Package as many times (in this case, the package
+ // would have to be a POMultiQueryPackage since we had
+ // a POSplit in the map). That Package would have a baseindex
+ // correctly set (in the beginning of this method) and would
+ // be able to handle the outputs from the different
+ // POLocalRearranges.
for (int i=initial; i<current; i++) {
pkg.addPackage(cpk);
}
@@ -561,9 +598,12 @@
}
pkg.setKeyType(cpk.getKeyType());
-
+
+ // See comment above for why we replicated the Package
+ // in the from plan - for the same reason, we replicate
+ // the Demux operators now.
for (int i=initial; i<current; i++) {
- demux.addPlan(from);
+ demux.addPlan(from, mapKeyType);
}
}
@@ -600,7 +640,7 @@
boolean sameKeyType = hasSameMapKeyType(mergeList);
- log.info("Splittees have the same key type: " + sameKeyType);
+ log.debug("Splittees have the same key type: " + sameKeyType);
// create a new reduce plan that will be the container
// for the multiple reducer plans of the MROpers in the mergeList
@@ -611,13 +651,17 @@
PhysicalPlan comPl = needCombiner(mergeList) ?
createDemuxPlan(sameKeyType, true) : null;
- log.info("Splittees have combiner: " + (comPl != null));
+ log.debug("Splittees have combiner: " + (comPl != null));
int index = 0;
for (MapReduceOper mrOp : mergeList) {
- // merge the map plan
+ // merge the map plan - this will recursively
+ // set index on all POLocalRearranges encountered
+ // including ones in inner plans of any POSplit
+ // operators. Hence the index returned could be
+ // > index + 1
int incIndex = mergeOneMapPlanWithIndex(
mrOp.mapPlan, splitOp, index, sameKeyType);
@@ -625,7 +669,7 @@
if (comPl != null) {
if (!mrOp.combinePlan.isEmpty()) {
mergeOneCombinePlanWithIndex(
- mrOp.combinePlan, comPl, index, incIndex);
+ mrOp.combinePlan, comPl, index, incIndex,
mrOp.mapKeyType);
} else {
int errCode = 2141;
String msg = "Internal Error. Cannot merge non-combiner
with combiners for optimization.";
@@ -635,7 +679,7 @@
// merge the reducer plan
mergeOneReducePlanWithIndex(
- mrOp.reducePlan, redPl, index, incIndex);
+ mrOp.reducePlan, redPl, index, incIndex, mrOp.mapKeyType);
index = incIndex;
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MROperPlan.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MROperPlan.java?rev=783153&r1=783152&r2=783153&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MROperPlan.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MROperPlan.java
Tue Jun 9 22:11:11 2009
@@ -17,8 +17,12 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
/**
@@ -33,5 +37,23 @@
public MROperPlan() {
// TODO Auto-generated constructor stub
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ MRPrinter printer = new MRPrinter(ps, this);
+ printer.setVerbose(true);
+ try {
+ printer.visit();
+ } catch (VisitorException e) {
+ // TODO Auto-generated catch block
+ throw new RuntimeException("Unable to get String representation of
plan:" + e );
+ }
+ return baos.toString();
+ }
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=783153&r1=783152&r2=783153&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
Tue Jun 9 22:11:11 2009
@@ -30,6 +30,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.OperatorKey;
@@ -73,7 +74,18 @@
* The list of sub-plans the inner plan is composed of
*/
private ArrayList<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
-
+
+ /**
+ * If the POLocalRearranges corresponding to the reduce plans in
+ * myPlans (the list of inner plans of the demux) have different key types
+ * then the MultiQueryOptimizer converts all the keys to be of type tuple
+ * by wrapping any non-tuple keys into Tuples (keys which are already
tuples
+ * are left alone).
+ * The list below is a list of booleans indicating whether extra tuple
wrapping
+ * was done for the key in the corresponding POLocalRearranges and if we
need
+ * to "unwrap" the tuple to get to the key
+ */
+ private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
/*
* Flag indicating when a new pull should start
*/
@@ -158,7 +170,7 @@
@Override
public String name() {
- return "Demux - " + mKey.toString();
+ return "Demux" + isKeyWrapped + "[" + baseIndex +"] - " +
mKey.toString();
}
@Override
@@ -203,9 +215,13 @@
*
* @param inPlan plan to be appended to the inner plan list
*/
- public void addPlan(PhysicalPlan inPlan) {
+ public void addPlan(PhysicalPlan inPlan, byte mapKeyType) {
myPlans.add(inPlan);
processedSet.set(myPlans.size()-1);
+ // if mapKeyType is already a tuple, we will NOT
+ // be wrapping it in an extra tuple. If it is not
+ // a tuple, we will wrap into in a tuple
+ isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
}
@Override
@@ -259,8 +275,7 @@
if (res.returnStatus == POStatus.STATUS_EOP) {
getNext = true;
}
-
- return (res.returnStatus == POStatus.STATUS_OK) ? res : empty;
+ return (res.returnStatus == POStatus.STATUS_OK || res.returnStatus ==
POStatus.STATUS_ERR) ? res : empty;
}
private Result getStreamCloseResult() throws ExecException {
@@ -334,10 +349,10 @@
int index = key.getIndex();
index &= idxPart;
index -= baseIndex;
-
+
PhysicalPlan pl = myPlans.get(index);
if (!(pl.getRoots().get(0) instanceof PODemux)) {
- if (!sameMapKeyType & !inCombiner) {
+ if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
Tuple tup = (Tuple)key.getValueAsPigType();
res.set(0, tup.get(0));
} else {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=783153&r1=783152&r2=783153&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
Tue Jun 9 22:11:11 2009
@@ -36,6 +36,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
@@ -177,14 +178,30 @@
}
private void setIndex(int index, boolean multiQuery) throws ExecException {
- if (index > 0x7F) {
+ if (index > PigNullableWritable.idxSpace) {
+ // indices in group and cogroup should only
+ // be in the range 0x00 to 0x7F (only 127 possible
+ // inputs)
int errCode = 1082;
String msg = multiQuery?
"Merge more than 127 map-reduce jobs not supported."
: "Cogroups with more than 127 inputs not supported.";
throw new ExecException(msg, errCode, PigException.INPUT);
} else {
- this.index = multiQuery ? (byte)(index | 0x80) : (byte)index;
+ // We could potentially be sending the (key, value) relating to
+ // multiple "group by" statements through one map reduce job
+ // in multiquery optimized execution. In this case, we want
+ // two keys which have the same content but coming from different
+ // group by operations to be treated differently so that they
+ // go to different invocations of the reduce(). To achieve this
+ // we let the index be outside the regular index space - 0x00 to
0x7F
+ // by ORing with the mqFlag bitmask which will put the index above
+ // the 0x7F value. In PigNullableWritable.compareTo if the index is
+ // in this "multiquery" space, we also consider the index when
comparing
+ // two PigNullableWritables and not just the contents. Keys with
same
+ // contents coming from different "group by" operations would have
different
+ // indices and hence would go to different invocation of reduce()
+ this.index = multiQuery ? (byte)(index |
PigNullableWritable.mqFlag) : (byte)index;
}
lrOutput.set(0, new Byte(this.index));
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=783153&r1=783152&r2=783153&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
Tue Jun 9 22:11:11 2009
@@ -109,7 +109,7 @@
@Override
public String name() {
- return "MultiQuery Package - " + getOperatorKey().toString();
+ return "MultiQuery Package[" + baseIndex +"] - " +
getOperatorKey().toString();
}
@Override
@@ -187,7 +187,16 @@
Tuple tuple = (Tuple)res.result;
- // replace the wrapped value in the key with the key itself
+ // the key present in the first field
+ // of the tuple above is the real key without
+ // index information - this is because the
+ // package above, extracts the real key out of
+ // the PigNullableWritable key - we are going to
+ // give this result tuple to a PODemux operator
+ // which needs a PigNullableWritable key so
+ // it can figure out the index - we already have
+ // the PigNullableWritable key cachec in "myKey"
+ // let's send this in the result tuple
tuple.set(0, myKey);
return res;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=783153&r1=783152&r2=783153&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java Tue
Jun 9 22:11:11 2009
@@ -36,9 +36,18 @@
*/
public abstract class PigNullableWritable implements WritableComparable {
- private static byte mqFlag = (byte)0x80;
+ /**
+ * indices in multiquery optimized maps
+ * will have the Most Significant Bit set
+ * This is a bitmask used in those cases.
+ */
+ public static final byte mqFlag = (byte)0x80;
- private static byte idxSpace = (byte)0x7F;
+ /**
+ * regular indices used in group and cogroup
+ * can only go from 0x00 to 0x7F
+ */
+ public static final byte idxSpace = (byte)0x7F;
private boolean mNull;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=783153&r1=783152&r2=783153&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Tue Jun 9
22:11:11 2009
@@ -22,7 +22,10 @@
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import junit.framework.Assert;
import junit.framework.TestCase;
@@ -39,6 +42,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -456,6 +460,55 @@
}
@Test
+ public void testMultiQueryPhase3WithDifferentMapDataTypes3() {
+
+ System.out.println("===== multi-query phase 3 with different map
datatypes (3) =====");
+
+ try {
+ myPig.setBatchOn();
+ String[] inputData = {"john\t20\t3.4",
+ "john\t25\t3.4" ,
+ "henry\t23\t3.9" ,
+ "adam\t54\t2.9" ,
+ "henry\t21\t3.9"};
+ Util.createInputFile(cluster, "queryInput.txt", inputData);
+
+ myPig.registerQuery("a = load 'queryInput.txt' " +
+ "as (name:chararray, age:int, gpa:double);");
+ myPig.registerQuery("b = group a all;");
+ myPig.registerQuery("c = foreach b generate group, COUNT(a);");
+ myPig.registerQuery("store c into 'foo';");
+ myPig.registerQuery("d = group a by (name, gpa);");
+ myPig.registerQuery("e = foreach d generate flatten(group),
MIN(a.age);");
+ myPig.registerQuery("store e into 'bar';");
+
+ myPig.executeBatch();
+
+ myPig.registerQuery("a = load 'foo' as (grp:chararray, cnt:long)
;");
+ Iterator<Tuple> it = myPig.openIterator("a");
+ assertEquals(Util.getPigConstant("('all', 5l)"), it.next());
+ assertFalse(it.hasNext());
+
+ myPig.registerQuery("a = load 'bar' as (name:chararray,
gpa:double, age:int);");
+ it = myPig.openIterator("a");
+ int i = 0;
+ Map<String, Tuple> expectedResults = new HashMap<String, Tuple>();
+ expectedResults.put("john", (Tuple)
Util.getPigConstant("('john',3.4,20)"));
+ expectedResults.put("adam", (Tuple)
Util.getPigConstant("('adam',2.9,54)"));
+ expectedResults.put("henry", (Tuple)
Util.getPigConstant("('henry',3.9,21)"));
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ i++;
+ assertEquals(expectedResults.get(t.get(0)), t);
+ }
+ assertEquals(3, i);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
public void testMultiQueryPhase3StreamingInReducer() {
System.out.println("===== multi-query phase 3 with streaming in
reducer =====");