Author: pradeepkth
Date: Tue Jun  9 22:15:00 2009
New Revision: 783156

URL: http://svn.apache.org/viewvc?rev=783156&view=rev
Log:
Multiquery optimization does not handle the case where the map keys in the 
split plans have different key types (tuple and non tuple key type) (pradeepkth)

Modified:
    hadoop/pig/branches/branch-0.3/CHANGES.txt
    
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MROperPlan.java
    
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
    
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
    
hadoop/pig/branches/branch-0.3/src/org/apache/pig/impl/io/PigNullableWritable.java
    hadoop/pig/branches/branch-0.3/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/branches/branch-0.3/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.3/CHANGES.txt?rev=783156&r1=783155&r2=783156&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.3/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.3/CHANGES.txt Tue Jun  9 22:15:00 2009
@@ -67,6 +67,10 @@
 
 BUG FIXES
 
+PIG-835: Multiquery optimization does not handle the case where the map keys
+in the split plans have different key types (tuple and non tuple key type)
+(pradeepkth)
+
 PIG-839: incorrect return codes on failure when using -f or -e flags (hagletin
 via sms)
 

Modified: 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=783156&r1=783155&r2=783156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (original)
+++ 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Tue Jun  9 22:15:00 2009
@@ -35,6 +35,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.DataType;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -420,12 +421,28 @@
                 lr.setKeyType(DataType.TUPLE);
             }
         } else if (leaf instanceof POSplit) {
+            // if the map plan that we are trying to merge
+            // has a split, we need to update the indices of
+            // the POLocalRearrange operators in the inner plans
+            // of the split to be a continuation of the index
+            // number sequence we are currently at.
+            // So for example, if we we are in the MapRedOper
+            // we are currently processing, if the index is currently
+            // at 1 (meaning index 0 was used for a map plan
+            // merged earlier), then we want the POLocalRearrange
+            // operators in the split to have indices 1, 2 ...
+            // essentially we are flattening the index numbers
+            // across all POLocalRearranges in all merged map plans
+            // including nested ones in POSplit
             POSplit spl = (POSplit)leaf;
             curIndex = setIndexOnLRInSplit(index, spl);
         }
                     
         splitOp.addPlan(pl);
-               
+        
+        // return the updated index after setting index
+        // on all POLocalRearranges including ones
+        // in inner plans of any POSplit operators
         return curIndex;
     }
     
@@ -439,7 +456,14 @@
             PhysicalOperator leaf = pl.getLeaves().get(0);
             if (leaf instanceof POLocalRearrange) {
                 POLocalRearrange lr = (POLocalRearrange)leaf;
-                try {                    
+                try {
+                    // if the baseindex is set on the demux, then
+                    // POLocalRearranges in its inner plan should really
+                    // be sending an index out by adding the base index
+                    // This is because we would be replicating the demux
+                    // as many times as there are inner plans in the demux
+                    // hence the index coming out of POLocalRearranges
+                    // needs to be adjusted accordingly
                     lr.setMultiQueryIndex(initial + lr.getIndex());            
       
                 } catch (ExecException e) {                   
                     int errCode = 2136;
@@ -474,7 +498,7 @@
     }
     
     private void mergeOneReducePlanWithIndex(PhysicalPlan from, 
-            PhysicalPlan to, int initial, int current) throws VisitorException 
{                    
+            PhysicalPlan to, int initial, int current, byte mapKeyType) throws 
VisitorException {                    
         POPackage pk = (POPackage)from.getRoots().get(0);
         from.remove(pk);
  
@@ -483,7 +507,7 @@
         // with the new indexed key       
         Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = 
pk.getKeyInfo();
         if (keyInfo != null && keyInfo.size() > 0) {      
-            byte b = (byte)(initial | 0x80);
+            byte b = (byte)(initial | PigNullableWritable.mqFlag);
             keyInfo.put(new Integer(b), keyInfo.get(0));
         }     
         
@@ -505,7 +529,7 @@
         
         PODemux demux = (PODemux)to.getLeaves().get(0);
         for (int i=initial; i<current; i++) {
-            demux.addPlan(from);
+            demux.addPlan(from, mapKeyType);
         }
                
         if (demux.isSameMapKeyType()) {
@@ -516,7 +540,7 @@
     }
     
     private void mergeOneCombinePlanWithIndex(PhysicalPlan from,
-            PhysicalPlan to, int initial, int current) throws VisitorException 
{
+            PhysicalPlan to, int initial, int current, byte mapKeyType) throws 
VisitorException {
         POPackage cpk = (POPackage)from.getRoots().get(0);
         from.remove(cpk);
        
@@ -550,7 +574,20 @@
             setBaseIndexOnDemux(initial, locDemux);
         } 
        
-        POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);   
  
+        POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
+        
+        // if current > initial + 1, it means we had
+        // a split in the map of the MROper we are trying to
+        // merge. In that case we would have changed the indices
+        // of the POLocalRearranges in the split to be in the
+        // range initial to current. To handle key, value pairs
+        // coming out of those POLocalRearranges, we replicate
+        // the Package as many times (in this case, the package
+        // would have to be a POMultiQueryPackage since we had
+        // a POSplit in the map). That Package would have a baseindex
+        // correctly set (in the beginning of this method) and would
+        // be able to handle the outputs from the different
+        // POLocalRearranges.
         for (int i=initial; i<current; i++) {
             pkg.addPackage(cpk);
         }
@@ -561,9 +598,12 @@
         } 
         
         pkg.setKeyType(cpk.getKeyType());
-                
+        
+        // See comment above for why we replicated the Package
+        // in the from plan - for the same reason, we replicate
+        // the Demux operators now.
         for (int i=initial; i<current; i++) {
-            demux.addPlan(from);
+            demux.addPlan(from, mapKeyType);
         }
     }
     
@@ -600,7 +640,7 @@
        
         boolean sameKeyType = hasSameMapKeyType(mergeList);
         
-        log.info("Splittees have the same key type: " + sameKeyType);
+        log.debug("Splittees have the same key type: " + sameKeyType);
         
         // create a new reduce plan that will be the container
         // for the multiple reducer plans of the MROpers in the mergeList
@@ -611,13 +651,17 @@
         PhysicalPlan comPl = needCombiner(mergeList) ? 
                 createDemuxPlan(sameKeyType, true) : null;
 
-        log.info("Splittees have combiner: " + (comPl != null));
+        log.debug("Splittees have combiner: " + (comPl != null));
                 
         int index = 0;             
         
         for (MapReduceOper mrOp : mergeList) {
 
-            // merge the map plan            
+            // merge the map plan - this will recursively
+            // set index on all POLocalRearranges encountered
+            // including ones in inner plans of any POSplit
+            // operators. Hence the index returned could be
+            // > index + 1
             int incIndex = mergeOneMapPlanWithIndex(
                     mrOp.mapPlan, splitOp, index, sameKeyType);
                        
@@ -625,7 +669,7 @@
             if (comPl != null) {
                 if (!mrOp.combinePlan.isEmpty()) {                    
                     mergeOneCombinePlanWithIndex(
-                            mrOp.combinePlan, comPl, index, incIndex);
+                            mrOp.combinePlan, comPl, index, incIndex, 
mrOp.mapKeyType);
                 } else {         
                     int errCode = 2141;
                     String msg = "Internal Error. Cannot merge non-combiner 
with combiners for optimization.";
@@ -635,7 +679,7 @@
 
             // merge the reducer plan
             mergeOneReducePlanWithIndex(
-                    mrOp.reducePlan, redPl, index, incIndex);
+                    mrOp.reducePlan, redPl, index, incIndex, mrOp.mapKeyType);
            
             index = incIndex;
             

Modified: 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MROperPlan.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MROperPlan.java?rev=783156&r1=783155&r2=783156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MROperPlan.java
 (original)
+++ 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MROperPlan.java
 Tue Jun  9 22:15:00 2009
@@ -17,8 +17,12 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
 
 
 /**
@@ -33,5 +37,23 @@
     public MROperPlan() {
         // TODO Auto-generated constructor stub
     }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#toString()
+     */
+    @Override
+    public String toString() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        MRPrinter printer = new MRPrinter(ps, this);
+        printer.setVerbose(true);
+        try {
+            printer.visit();
+        } catch (VisitorException e) {
+            // TODO Auto-generated catch block
+            throw new RuntimeException("Unable to get String representation of 
plan:" + e );
+        }
+        return baos.toString();
+    }
 
 }

Modified: 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=783156&r1=783155&r2=783156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
 (original)
+++ 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
 Tue Jun  9 22:15:00 2009
@@ -30,6 +30,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -73,7 +74,18 @@
      * The list of sub-plans the inner plan is composed of
      */
     private ArrayList<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
-           
+    
+    /**
+     * If the POLocalRearranges corresponding to the reduce plans in 
+     * myPlans (the list of inner plans of the demux) have different key types
+     * then the MultiQueryOptimizer converts all the keys to be of type tuple
+     * by wrapping any non-tuple keys into Tuples (keys which are already 
tuples
+     * are left alone).
+     * The list below is a list of booleans indicating whether extra tuple 
wrapping
+     * was done for the key in the corresponding POLocalRearranges and if we 
need
+     * to "unwrap" the tuple to get to the key
+     */
+    private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
     /*
      * Flag indicating when a new pull should start 
      */
@@ -158,7 +170,7 @@
 
     @Override
     public String name() {
-        return "Demux - " + mKey.toString();
+        return "Demux" + isKeyWrapped + "[" + baseIndex +"] - " + 
mKey.toString();
     }
 
     @Override
@@ -203,9 +215,13 @@
      * 
      * @param inPlan plan to be appended to the inner plan list
      */
-    public void addPlan(PhysicalPlan inPlan) {  
+    public void addPlan(PhysicalPlan inPlan, byte mapKeyType) {  
         myPlans.add(inPlan);
         processedSet.set(myPlans.size()-1);
+        // if mapKeyType is already a tuple, we will NOT
+        // be wrapping it in an extra tuple. If it is not
+        // a tuple, we will wrap into in a tuple
+        isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
     }
    
     @Override
@@ -259,8 +275,7 @@
         if (res.returnStatus == POStatus.STATUS_EOP) {
             getNext = true;
         }
-        
-        return (res.returnStatus == POStatus.STATUS_OK) ? res : empty;
+        return (res.returnStatus == POStatus.STATUS_OK || res.returnStatus == 
POStatus.STATUS_ERR) ? res : empty;
     }
 
     private Result getStreamCloseResult() throws ExecException {
@@ -334,10 +349,10 @@
         int index = key.getIndex();
         index &= idxPart;
         index -= baseIndex;                         
-                           
+        
         PhysicalPlan pl = myPlans.get(index);
         if (!(pl.getRoots().get(0) instanceof PODemux)) {                      
       
-            if (!sameMapKeyType & !inCombiner) {                               
        
+            if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {   
                                    
                 Tuple tup = (Tuple)key.getValueAsPigType();
                 res.set(0, tup.get(0));
             } else {

Modified: 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=783156&r1=783155&r2=783156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
 (original)
+++ 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
 Tue Jun  9 22:15:00 2009
@@ -36,6 +36,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
@@ -177,14 +178,30 @@
     }
     
     private void setIndex(int index, boolean multiQuery) throws ExecException {
-        if (index > 0x7F) {
+        if (index > PigNullableWritable.idxSpace) { 
+            // indices in group and cogroup should only
+            // be in the range 0x00 to 0x7F (only 127 possible
+            // inputs)
             int errCode = 1082;
             String msg = multiQuery? 
                     "Merge more than 127 map-reduce jobs not supported."
                   : "Cogroups with more than 127 inputs not supported.";
             throw new ExecException(msg, errCode, PigException.INPUT);
         } else {
-            this.index = multiQuery ? (byte)(index | 0x80) : (byte)index;
+            // We could potentially be sending the (key, value) relating to 
+            // multiple "group by" statements through one map reduce job
+            // in  multiquery optimized execution. In this case, we want
+            // two keys which have the same content but coming from different
+            // group by operations to be treated differently so that they
+            // go to different invocations of the reduce(). To achieve this
+            // we let the index be outside the regular index space - 0x00 to 
0x7F
+            // by ORing with the mqFlag bitmask which will put the index above
+            // the 0x7F value. In PigNullableWritable.compareTo if the index is
+            // in this "multiquery" space, we also consider the index when 
comparing
+            // two PigNullableWritables and not just the contents. Keys with 
same
+            // contents coming from different "group by" operations would have 
different
+            // indices and hence would go to different invocation of reduce()
+            this.index = multiQuery ? (byte)(index | 
PigNullableWritable.mqFlag) : (byte)index;
         }            
         lrOutput.set(0, new Byte(this.index));
     }

Modified: 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=783156&r1=783155&r2=783156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
 (original)
+++ 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
 Tue Jun  9 22:15:00 2009
@@ -109,7 +109,7 @@
 
     @Override
     public String name() {
-        return "MultiQuery Package - " +  getOperatorKey().toString();
+        return "MultiQuery Package[" + baseIndex +"] - " +  
getOperatorKey().toString();
     }
 
     @Override
@@ -187,7 +187,16 @@
         
         Tuple tuple = (Tuple)res.result;
 
-        // replace the wrapped value in the key with the key itself
+        // the key present in the first field
+        // of the tuple above is the real key without
+        // index information - this is because the
+        // package above, extracts the real key out of
+        // the PigNullableWritable key - we are going to
+        // give this result tuple to a PODemux operator
+        // which needs a PigNullableWritable key so
+        // it can figure out the index - we already have
+        // the PigNullableWritable key cachec in "myKey"
+        // let's send this in the result tuple
         tuple.set(0, myKey);
 
         return res;

Modified: 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.3/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=783156&r1=783155&r2=783156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/impl/io/PigNullableWritable.java
 (original)
+++ 
hadoop/pig/branches/branch-0.3/src/org/apache/pig/impl/io/PigNullableWritable.java
 Tue Jun  9 22:15:00 2009
@@ -36,9 +36,18 @@
  */
 public abstract class PigNullableWritable implements WritableComparable {
 
-    private static byte mqFlag = (byte)0x80;
+    /**
+     * indices in multiquery optimized maps
+     * will have the Most Significant Bit set
+     * This is a bitmask used in those cases.
+     */
+    public static final byte mqFlag = (byte)0x80;
     
-    private static byte idxSpace = (byte)0x7F;
+    /**
+     *  regular indices used in group and cogroup
+     *  can only go from 0x00 to 0x7F
+     */
+    public static final byte idxSpace = (byte)0x7F;
     
     private boolean mNull;
 

Modified: 
hadoop/pig/branches/branch-0.3/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.3/test/org/apache/pig/test/TestMultiQuery.java?rev=783156&r1=783155&r2=783156&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.3/test/org/apache/pig/test/TestMultiQuery.java 
(original)
+++ hadoop/pig/branches/branch-0.3/test/org/apache/pig/test/TestMultiQuery.java 
Tue Jun  9 22:15:00 2009
@@ -22,7 +22,10 @@
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
@@ -39,6 +42,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -456,6 +460,55 @@
     }         
     
     @Test
+    public void testMultiQueryPhase3WithDifferentMapDataTypes3() {
+
+        System.out.println("===== multi-query phase 3 with different map 
datatypes (3) =====");
+
+        try {
+            myPig.setBatchOn();
+            String[] inputData = {"john\t20\t3.4",
+                       "john\t25\t3.4" ,
+                       "henry\t23\t3.9" ,
+                       "adam\t54\t2.9" ,
+                       "henry\t21\t3.9"};
+            Util.createInputFile(cluster, "queryInput.txt", inputData);
+
+            myPig.registerQuery("a = load 'queryInput.txt' " +
+                                 "as (name:chararray, age:int, gpa:double);");
+            myPig.registerQuery("b = group a all;");
+            myPig.registerQuery("c = foreach b generate group, COUNT(a);");
+            myPig.registerQuery("store c into 'foo';");
+            myPig.registerQuery("d = group a by (name, gpa);");
+            myPig.registerQuery("e = foreach d generate flatten(group), 
MIN(a.age);");
+            myPig.registerQuery("store e into 'bar';");
+             
+            myPig.executeBatch();
+            
+            myPig.registerQuery("a = load 'foo' as (grp:chararray, cnt:long) 
;");
+            Iterator<Tuple> it = myPig.openIterator("a");
+            assertEquals(Util.getPigConstant("('all', 5l)"), it.next());
+            assertFalse(it.hasNext());
+            
+            myPig.registerQuery("a = load 'bar' as (name:chararray, 
gpa:double, age:int);");
+            it = myPig.openIterator("a");
+            int i = 0;
+            Map<String, Tuple> expectedResults = new HashMap<String, Tuple>();
+            expectedResults.put("john", (Tuple) 
Util.getPigConstant("('john',3.4,20)"));
+            expectedResults.put("adam", (Tuple) 
Util.getPigConstant("('adam',2.9,54)"));
+            expectedResults.put("henry", (Tuple) 
Util.getPigConstant("('henry',3.9,21)"));
+            while(it.hasNext()) {
+                Tuple t = it.next();
+                i++;
+                assertEquals(expectedResults.get(t.get(0)), t);
+            }
+            assertEquals(3, i);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
     public void testMultiQueryPhase3StreamingInReducer() {
 
         System.out.println("===== multi-query phase 3 with streaming in 
reducer =====");


Reply via email to