Author: rding
Date: Wed Feb 17 18:44:52 2010
New Revision: 911147
URL: http://svn.apache.org/viewvc?rev=911147&view=rev
Log:
PIG-1194: ERROR 2055: Received Error while processing the map plan
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=911147&r1=911146&r2=911147&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
Wed Feb 17 18:44:52 2010
@@ -20,6 +20,8 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -53,6 +55,10 @@
protected static final TupleFactory mTupleFactory =
TupleFactory.getInstance();
protected static BagFactory mBagFactory = BagFactory.getInstance();
+ private static Log log =
LogFactory.getLog(POPreCombinerLocalRearrange.class);
+
+ private static final Result ERR_RESULT = new Result();
+
protected List<PhysicalPlan> plans;
protected List<ExpressionOperator> leafOps;
@@ -115,7 +121,7 @@
public Result getNext(Tuple t) throws ExecException {
Result inp = null;
- Result res = null;
+ Result res = ERR_RESULT;
while (true) {
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus ==
POStatus.STATUS_ERR)
@@ -160,12 +166,23 @@
case DataType.TUPLE:
res = op.getNext(dummyTuple);
break;
+ default:
+ log.error("Invalid result type: "
+ + DataType.findType(op.getResultType()));
+ break;
}
- if(res.returnStatus!=POStatus.STATUS_OK)
+
+ // allow null as group by key
+ if (res.returnStatus != POStatus.STATUS_OK
+ && res.returnStatus != POStatus.STATUS_NULL) {
return new Result();
+ }
+
resLst.add(res);
}
res.result = constructLROutput(resLst,(Tuple)inp.result);
+ res.returnStatus = POStatus.STATUS_OK;
+
return res;
}
return inp;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java?rev=911147&r1=911146&r2=911147&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java Wed Feb
17 18:44:52 2010
@@ -195,22 +195,29 @@
try {
PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
w.println("10\t2\t3");
+ w.println("10\t4\t5");
+ w.println("20\t3000\t2");
+ w.println("20\t4000\t3");
w.println("20\t3\t");
+ w.println("21\t4\t");
+ w.println("22\t5\t");
w.close();
Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
PigServer myPig = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
myPig.registerQuery("data = load '" + INPUT_FILE + "' as (a0, a1,
a2);");
- myPig.registerQuery("grp = GROUP data BY (((double) a2)/((double)
a1) > .001 OR a0 < 11 ? a0 : -1);");
+ myPig.registerQuery("grp = GROUP data BY (((double) a2)/((double)
a1) > .001 OR a0 < 11 ? a0 : 0);");
+ myPig.registerQuery("res = FOREACH grp GENERATE group,
SUM(data.a1), SUM(data.a2);");
List<Tuple> expectedResults =
Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(10,{(10,2,3)})",
- "(null,{(20,3,null)})"
+ new String[] {
+ "(0,7000.0,5.0)",
+ "(10,6.0,8.0)",
+ "(null,12.0,null)"
});
- Iterator<Tuple> iter = myPig.openIterator("grp");
+ Iterator<Tuple> iter = myPig.openIterator("res");
int counter = 0;
while (iter.hasNext()) {
assertEquals(expectedResults.get(counter++).toString(),
iter.next().toString());