Author: rding
Date: Wed Feb 17 18:44:52 2010
New Revision: 911147

URL: http://svn.apache.org/viewvc?rev=911147&view=rev
Log:
PIG-1194: ERROR 2055: Received Error while processing the map plan

Modified:
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=911147&r1=911146&r2=911147&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
 Wed Feb 17 18:44:52 2010
@@ -20,6 +20,8 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -53,6 +55,10 @@
     protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
     protected static BagFactory mBagFactory = BagFactory.getInstance();
 
+    private static Log log = 
LogFactory.getLog(POPreCombinerLocalRearrange.class);
+    
+    private static final Result ERR_RESULT = new Result();
+    
     protected List<PhysicalPlan> plans;
     
     protected List<ExpressionOperator> leafOps;
@@ -115,7 +121,7 @@
     public Result getNext(Tuple t) throws ExecException {
         
         Result inp = null;
-        Result res = null;
+        Result res = ERR_RESULT;
         while (true) {
             inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == 
POStatus.STATUS_ERR)
@@ -160,12 +166,23 @@
                 case DataType.TUPLE:
                     res = op.getNext(dummyTuple);
                     break;
+                default:
+                    log.error("Invalid result type: "
+                            + DataType.findType(op.getResultType()));
+                    break;
                 }
-                if(res.returnStatus!=POStatus.STATUS_OK)
+                
+                // allow null as group by key
+                if (res.returnStatus != POStatus.STATUS_OK
+                        && res.returnStatus != POStatus.STATUS_NULL) {
                     return new Result();
+                }
+                
                 resLst.add(res);
             }
             res.result = constructLROutput(resLst,(Tuple)inp.result);
+            res.returnStatus = POStatus.STATUS_OK;
+            
             return res;
         }
         return inp;

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java?rev=911147&r1=911146&r2=911147&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java Wed Feb 
17 18:44:52 2010
@@ -195,22 +195,29 @@
         try {
             PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
             w.println("10\t2\t3");
+            w.println("10\t4\t5");
+            w.println("20\t3000\t2");
+            w.println("20\t4000\t3");
             w.println("20\t3\t");
+            w.println("21\t4\t");
+            w.println("22\t5\t");
             w.close();
             Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
 
             PigServer myPig = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
 
             myPig.registerQuery("data = load '" + INPUT_FILE + "' as (a0, a1, 
a2);");
-            myPig.registerQuery("grp = GROUP data BY (((double) a2)/((double) 
a1) > .001 OR a0 < 11 ? a0 : -1);");
+            myPig.registerQuery("grp = GROUP data BY (((double) a2)/((double) 
a1) > .001 OR a0 < 11 ? a0 : 0);");
+            myPig.registerQuery("res = FOREACH grp GENERATE group, 
SUM(data.a1), SUM(data.a2);");
             
             List<Tuple> expectedResults = 
Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
-                            "(10,{(10,2,3)})",
-                            "(null,{(20,3,null)})"
+                    new String[] {   
+                            "(0,7000.0,5.0)",
+                            "(10,6.0,8.0)",                            
+                            "(null,12.0,null)"
                     });
             
-            Iterator<Tuple> iter = myPig.openIterator("grp");
+            Iterator<Tuple> iter = myPig.openIterator("res");
             int counter = 0;
             while (iter.hasNext()) {
                 assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());      


Reply via email to