Author: gates
Date: Thu Dec 3 22:56:45 2009
New Revision: 886973
URL: http://svn.apache.org/viewvc?rev=886973&view=rev
Log:
PIG-1068: COGROUP fails with 'Type mismatch in key from map: expected
org.apache.pig.impl.io.NullableText, recieved
org.apache.pig.impl.io.NullableTuple'
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=886973&r1=886972&r2=886973&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Dec 3 22:56:45 2009
@@ -45,6 +45,10 @@
BUG FIXES
+PIG-1068: COGROUP fails with 'Type mismatch in key from map: expected
+ org.apache.pig.impl.io.NullableText, recieved
+ org.apache.pig.impl.io.NullableTuple' (rding via gates)
+
PIG-1113: Diamond query optimization throws error in JOIN (rding via olgan)
PIG-1116: Remove redundant map-reduce job for merge join (pradeepkth)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=886973&r1=886972&r2=886973&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
Thu Dec 3 22:56:45 2009
@@ -662,9 +662,10 @@
pkg.addPackage(p);
pkCount++;
}
+
pkg.addIsKeyWrappedList(((POMultiQueryPackage)pk).getIsKeyWrappedList());
addShiftedKeyInfoIndex(initial, current, (POMultiQueryPackage)pk);
} else {
- pkg.addPackage(pk);
+ pkg.addPackage(pk, mapKeyType);
pkCount = 1;
}
@@ -673,8 +674,6 @@
String msg = "Internal Error. Inconsistency in key index found
during optimization.";
throw new OptimizerException(msg, errCode, PigException.BUG);
}
-
- boolean[] keyPos = pk.getKeyPositionsInTuple();
PODemux demux = (PODemux)to.getLeaves().get(0);
int plCount = 0;
@@ -685,12 +684,11 @@
// operator, then it's the only operator in the plan.
List<PhysicalPlan> pls = ((PODemux)root).getPlans();
for (PhysicalPlan pl : pls) {
- demux.addPlan(pl, keyPos);
+ demux.addPlan(pl);
plCount++;
}
- demux.addIsKeyWrappedList(((PODemux)root).getIsKeyWrappedList());
} else {
- demux.addPlan(from, mapKeyType, keyPos);
+ demux.addPlan(from);
plCount = 1;
}
@@ -700,11 +698,11 @@
throw new OptimizerException(msg, errCode, PigException.BUG);
}
- if (demux.isSameMapKeyType()) {
+ if (pkg.isSameMapKeyType()) {
pkg.setKeyType(pk.getKeyType());
} else {
pkg.setKeyType(DataType.TUPLE);
- }
+ }
}
private void addShiftedKeyInfoIndex(int index, POPackage pkg) throws
OptimizerException {
@@ -785,11 +783,11 @@
from.remove(cpk);
PODemux demux = (PODemux)to.getLeaves().get(0);
-
- boolean isSameKeyType = demux.isSameMapKeyType();
-
+
POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
+ boolean isSameKeyType = pkg.isSameMapKeyType();
+
// if current > initial + 1, it means we had
// a split in the map of the MROper we are trying to
// merge. In that case we would have changed the indices
@@ -818,6 +816,8 @@
pkCount = 1;
}
+ pkg.setSameMapKeyType(isSameKeyType);
+
if (pkCount != total) {
int errCode = 2146;
String msg = "Internal Error. Inconsistency in key index found
during optimization.";
@@ -831,8 +831,6 @@
pkg.setKeyType(cpk.getKeyType());
- boolean[] keyPos = cpk.getKeyPositionsInTuple();
-
// See comment above for why we flatten the Packages
// in the from plan - for the same reason, we flatten
// the inner plans of Demux operator now.
@@ -841,7 +839,7 @@
if (leaf instanceof PODemux) {
List<PhysicalPlan> pls = ((PODemux)leaf).getPlans();
for (PhysicalPlan pl : pls) {
- demux.addPlan(pl, mapKeyType, keyPos);
+ demux.addPlan(pl);
POLocalRearrange lr = (POLocalRearrange)pl.getLeaves().get(0);
try {
lr.setMultiQueryIndex(initial + plCount++);
@@ -858,7 +856,7 @@
}
}
} else {
- demux.addPlan(from, mapKeyType, keyPos);
+ demux.addPlan(from);
POLocalRearrange lr = (POLocalRearrange)from.getLeaves().get(0);
try {
lr.setMultiQueryIndex(initial + plCount++);
@@ -895,8 +893,8 @@
private PhysicalPlan createDemuxPlan(boolean sameKeyType, boolean
isCombiner)
throws VisitorException {
- PODemux demux = getDemux(sameKeyType, isCombiner);
- POMultiQueryPackage pkg= getMultiQueryPackage();
+ PODemux demux = getDemux(isCombiner);
+ POMultiQueryPackage pkg= getMultiQueryPackage(sameKeyType, isCombiner);
PhysicalPlan pl = new PhysicalPlan();
pl.add(pkg);
@@ -1135,14 +1133,17 @@
return new POStore(new OperatorKey(scope, nig.getNextNodeId(scope)));
}
- private PODemux getDemux(boolean sameMapKeyType, boolean inCombiner){
+ private PODemux getDemux(boolean inCombiner){
PODemux demux = new PODemux(new OperatorKey(scope,
nig.getNextNodeId(scope)));
- demux.setSameMapKeyType(sameMapKeyType);
demux.setInCombiner(inCombiner);
return demux;
}
- private POMultiQueryPackage getMultiQueryPackage(){
- return new POMultiQueryPackage(new OperatorKey(scope,
nig.getNextNodeId(scope)));
+ private POMultiQueryPackage getMultiQueryPackage(boolean sameMapKeyType,
boolean inCombiner){
+ POMultiQueryPackage pkg =
+ new POMultiQueryPackage(new OperatorKey(scope,
nig.getNextNodeId(scope)));
+ pkg.setInCombiner(inCombiner);
+ pkg.setSameMapKeyType(sameMapKeyType);
+ return pkg;
}
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=886973&r1=886972&r2=886973&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
Thu Dec 3 22:56:45 2009
@@ -68,26 +68,6 @@
*/
private ArrayList<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
- /**
- * If the POLocalRearranges corresponding to the reduce plans in
- * myPlans (the list of inner plans of the demux) have different key types
- * then the MultiQueryOptimizer converts all the keys to be of type tuple
- * by wrapping any non-tuple keys into Tuples (keys which are already
tuples
- * are left alone).
- * The list below is a list of booleans indicating whether extra tuple
wrapping
- * was done for the key in the corresponding POLocalRearranges and if we
need
- * to "unwrap" the tuple to get to the key
- */
- private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
-
- /**
- * The list tracks the field position of the key in the input tuple so that
- * the right values are "unwrapped" to get the key.
- * The tuples emitted from POCombinerPackages always have keys in a fixed
- * position, but this position varies depending on the Pig Latin scripts.
- */
- private ArrayList<boolean[]> keyPositions = new ArrayList<boolean[]>();
-
/*
* Flag indicating when a new pull should start
*/
@@ -106,14 +86,6 @@
private PhysicalOperator curLeaf = null;
/*
- * Indicating if all the inner plans have the same
- * map key type. If not, the keys passed in are
- * wrapped inside tuples and need to be extracted
- * out during the reduce phase
- */
- private boolean sameMapKeyType = true;
-
- /*
* Indicating if this operator is in a combiner.
* If not, this operator is in a reducer and the key
* values must first be extracted from the tuple-wrap
@@ -172,7 +144,7 @@
@Override
public String name() {
- return "Demux" + isKeyWrapped + " - " + mKey.toString();
+ return "Demux [" + myPlans.size() + "] "+ mKey.toString();
}
@Override
@@ -195,45 +167,14 @@
}
/**
- * Returns the list of booleans that indicates if the
- * key needs to unwrapped for the corresponding plan.
- *
- * @return the list of isKeyWrapped boolean values
- */
- public List<Boolean> getIsKeyWrappedList() {
- return Collections.unmodifiableList(isKeyWrapped);
- }
-
- /**
- * Adds a list of IsKeyWrapped boolean values
- *
- * @param lst the list of boolean values to add
- */
- public void addIsKeyWrappedList(List<Boolean> lst) {
- for (Boolean b : lst) {
- isKeyWrapped.add(b);
- }
- }
-
- /**
* Appends the specified plan at the end of the list.
*
* @param inPlan plan to be appended to the inner plan list
*/
- public void addPlan(PhysicalPlan inPlan, byte mapKeyType, boolean[]
keyPos) {
- myPlans.add(inPlan);
- processedSet.set(myPlans.size()-1);
- // if mapKeyType is already a tuple, we will NOT
- // be wrapping it in an extra tuple. If it is not
- // a tuple, we will wrap into in a tuple
- isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
- keyPositions.add(keyPos);
- }
- public void addPlan(PhysicalPlan inPlan, boolean[] keyPos) {
+ public void addPlan(PhysicalPlan inPlan) {
myPlans.add(inPlan);
processedSet.set(myPlans.size()-1);
- keyPositions.add(keyPos);
}
@Override
@@ -364,51 +305,12 @@
PhysicalPlan pl = myPlans.get(index);
if (!(pl.getRoots().get(0) instanceof PODemux)) {
- if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
-
- // unwrap the keys
- boolean[] keys = keyPositions.get(index);
- for (int pos = 0; pos < keys.length; pos++) {
- if (keys[pos]) {
- Tuple tup = (pos == 0) ?
- (Tuple)fld.getValueAsPigType() :
(Tuple)res.get(pos);
- res.set(pos, tup.get(0));
- }
- else if (pos == 0) {
- res.set(0, fld.getValueAsPigType());
- }
- }
-
- } else {
- res.set(0, fld.getValueAsPigType());
- }
+ res.set(0, fld.getValueAsPigType());
}
myPlans.get(index).attachInput(res);
return myPlans.get(index).getLeaves().get(0);
}
-
- /**
- * Sets a flag indicating if all inner plans have
- * the same map key type.
- *
- * @param sameMapKeyType true if all inner plans have
- * the same map key type; otherwise false
- */
- public void setSameMapKeyType(boolean sameMapKeyType) {
- this.sameMapKeyType = sameMapKeyType;
- }
-
- /**
- * Returns a flag indicating if all inner plans
- * have the same map key type
- *
- * @return true if all inner plans have
- * the same map key type; otherwise false
- */
- public boolean isSameMapKeyType() {
- return sameMapKeyType;
- }
/**
* Sets a flag indicating if this operator is
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=886973&r1=886972&r2=886973&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
Thu Dec 3 22:56:45 2009
@@ -18,6 +18,7 @@
package
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -28,6 +29,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.NullableUnknownWritable;
@@ -63,6 +65,34 @@
private List<POPackage> packages = new ArrayList<POPackage>();
+ /**
+ * If the POLocalRearranges corresponding to the reduce plans in
+ * myPlans (the list of inner plans of the demux) have different key types
+ * then the MultiQueryOptimizer converts all the keys to be of type tuple
+ * by wrapping any non-tuple keys into Tuples (keys which are already
tuples
+ * are left alone).
+ * The list below is a list of booleans indicating whether extra tuple
wrapping
+ * was done for the key in the corresponding POLocalRearranges and if we
need
+ * to "unwrap" the tuple to get to the key
+ */
+ private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
+
+ /*
+ * Indicating if all the inner plans have the same
+ * map key type. If not, the keys passed in are
+ * wrapped inside tuples and need to be extracted
+ * out during the reduce phase
+ */
+ private boolean sameMapKeyType = true;
+
+ /*
+ * Indicating if this operator is in a combiner.
+ * If not, this operator is in a reducer and the key
+ * values must first be extracted from the tuple-wrap
+ * before writing out to the disk
+ */
+ private boolean inCombiner = false;
+
transient private PigNullableWritable myKey;
/**
@@ -109,7 +139,7 @@
@Override
public String name() {
- return "MultiQuery Package - " + getOperatorKey().toString();
+ return "MultiQuery Package [" + isKeyWrapped + "] - " +
getOperatorKey().toString();
}
@Override
@@ -148,6 +178,21 @@
public void addPackage(POPackage pack) {
packages.add(pack);
}
+
+ /**
+ * Appends the specified package object to the end of
+ * the package list.
+ *
+ * @param pack package to be appended to the list
+ * @param mapKeyType the map key type associated with the package
+ */
+ public void addPackage(POPackage pack, byte mapKeyType) {
+ packages.add(pack);
+ // if mapKeyType is already a tuple, we will NOT
+ // be wrapping it in an extra tuple. If it is not
+ // a tuple, we will wrap into in a tuple
+ isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
+ }
/**
* Returns the list of packages.
@@ -179,10 +224,20 @@
+ " should be in the range between 0 and " + packages.size();
throw new ExecException(msg, errCode, PigException.BUG);
}
-
+
POPackage pack = packages.get(index);
-
- pack.attachInput(myKey, tupIter);
+
+ // check to see if we need to unwrap the key. The keys may be
+ // wrapped inside a tuple by LocalRearrange operator when jobs
+ // with different map key types are merged
+ PigNullableWritable curKey = myKey;
+ if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
+ Tuple tup = (Tuple)myKey.getValueAsPigType();
+ curKey = HDataType.getWritableComparableTypes(tup.get(0),
pack.getKeyType());
+ curKey.setIndex(origIndex);
+ }
+
+ pack.attachInput(curKey, tupIter);
Result res = pack.getNext(t);
@@ -218,4 +273,41 @@
return res;
}
+ /**
+ * Returns the list of booleans that indicates if the
+ * key needs to unwrapped for the corresponding plan.
+ *
+ * @return the list of isKeyWrapped boolean values
+ */
+ public List<Boolean> getIsKeyWrappedList() {
+ return Collections.unmodifiableList(isKeyWrapped);
+ }
+
+ /**
+ * Adds a list of IsKeyWrapped boolean values
+ *
+ * @param lst the list of boolean values to add
+ */
+ public void addIsKeyWrappedList(List<Boolean> lst) {
+ for (Boolean b : lst) {
+ isKeyWrapped.add(b);
+ }
+ }
+
+ public void setInCombiner(boolean inCombiner) {
+ this.inCombiner = inCombiner;
+ }
+
+ public boolean isInCombiner() {
+ return inCombiner;
+ }
+
+ public void setSameMapKeyType(boolean sameMapKeyType) {
+ this.sameMapKeyType = sameMapKeyType;
+ }
+
+ public boolean isSameMapKeyType() {
+ return sameMapKeyType;
+ }
+
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=886973&r1=886972&r2=886973&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Thu Dec 3
22:56:45 2009
@@ -88,6 +88,67 @@
myPig = null;
}
+ public void testMultiQueryJiraPig1068() {
+
+ // test case: COGROUP fails with 'Type mismatch in key from map:
+ // expected org.apache.pig.impl.io.NullableText, recieved
org.apache.pig.impl.io.NullableTuple'
+
+ String INPUT_FILE = "pig-1068.txt";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("10\tapple\tlogin\tjar");
+ w.println("20\torange\tlogin\tbox");
+ w.println("30\tstrawberry\tquit\tbot");
+
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("logs = load '" + INPUT_FILE
+ + "' as (ts:int, id:chararray, command:chararray,
comments:chararray);");
+ myPig.registerQuery("SPLIT logs INTO logins IF command == 'login',
all_quits IF command == 'quit';");
+ myPig.registerQuery("login_info = FOREACH logins { GENERATE id as
id, comments AS client; };");
+ myPig.registerQuery("logins_grouped = GROUP login_info BY (id,
client);");
+ myPig.registerQuery("count_logins_by_client = FOREACH
logins_grouped "
+ + "{ generate group.id AS id, group.client AS client,
COUNT($1) AS count; };");
+ myPig.registerQuery("all_quits_grouped = GROUP all_quits BY id; ");
+ myPig.registerQuery("quits = FOREACH all_quits_grouped { GENERATE
FLATTEN(all_quits); };");
+ myPig.registerQuery("joined_session_info = COGROUP quits BY id,
count_logins_by_client BY id;");
+
+ Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
+
+ List<Tuple> expectedResults =
Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('apple',{},{('apple','jar',1L)})",
+ "('orange',{},{('orange','box',1L)})",
+
"('strawberry',{(30,'strawberry','quit','bot')},{})"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(),
iter.next().toString());
+ }
+
+ assertEquals(expectedResults.size(), counter);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
@Test
public void testMultiQueryJiraPig1108() {