Author: olga
Date: Thu Nov 20 12:05:09 2008
New Revision: 719348

URL: http://svn.apache.org/viewvc?rev=719348&view=rev
Log:
PIG-537: Failure in Hadoop map collect stage due to type mismatch in the keys 
used in cogroup

Modified:
    hadoop/pig/branches/types/CHANGES.txt
    
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java
    
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    hadoop/pig/branches/types/test/org/apache/pig/test/Util.java
    hadoop/pig/branches/types/test/org/apache/pig/test/utils/Identity.java

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Thu Nov 20 12:05:09 2008
@@ -318,3 +318,6 @@
     PIG-528: use UDF return in schema computation (sms via olgan)
 
     PIG-527: allow PigStorage to write out complex output (sms via olgan)
+
+    PIG-537: Failure in Hadoop map collect stage due to type mismatch in the
+    keys used in cogroup (pradeepk vi olgan)

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
 Thu Nov 20 12:05:09 2008
@@ -31,6 +31,7 @@
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 
 public class ImplicitSplitInserter extends LogicalTransformer {
@@ -69,23 +70,62 @@
             List<LogicalOperator> succs = 
                 new 
ArrayList<LogicalOperator>(mPlan.getSuccessors(nodes.get(0)));
             int index = -1;
-            boolean nodeConnectedToSplit = false;
+            // For two successors of nodes.get(0) here is a pictorial
+            // representation of the change required:
+            // BEFORE:
+            // Succ1  Succ2
+            //  \       /
+            //  nodes.get(0)
+            
+            //  SHOULD BECOME:
+            
+            // AFTER:
+            // Succ1          Succ2
+            //   |              |
+            // SplitOutput SplitOutput
+            //      \       /
+            //        Split
+            //          |
+            //        nodes.get(0)
+            
+            // Here is how this will be accomplished.
+            // First (the same) Split Operator will be "inserted between" 
nodes.get(0)
+            // and all its successors. The "insertBetween" API is used which 
makes sure
+            // the ordering of operators in the graph is preserved. So we get 
the following: 
+            // Succ1        Succ2
+            //    |          |
+            //   Split     Split
+            //      \      /  
+            //      nodes.get(0)
+            
+            // Then all but the first connection between nodes.get(0) and the 
Split 
+            // Operator are removed using "disconnect" - so we get the 
following:
+            // Succ1          Succ2
+            //      \       /
+            //        Split
+            //          |
+            //        nodes.get(0)
+            
+            // Now a new SplitOutputOperator is "inserted between" the Split 
operator
+            // and the successors. So we get:
+            // Succ1          Succ2
+            //   |              |
+            // SplitOutput SplitOutput
+            //      \       /
+            //        Split
+            //          |
+            //        nodes.get(0)
+            
+            
+            for (LogicalOperator succ : succs) {
+                mPlan.insertBetween(nodes.get(0), splitOp, succ);
+            }
+            
+            for(int i = 1; i < succs.size(); i++) {
+                mPlan.disconnect(nodes.get(0), splitOp); 
+            }
+
             for (LogicalOperator succ : succs) {
-                if(!nodeConnectedToSplit) {
-                    mPlan.insertBetween(nodes.get(0), splitOp, succ);
-                    // nodes.get(0) should be connected to Split (only once) 
and
-                    // split -> splitoutput -> successor - this is for the 
first successor  
-                    // for the next successor we just want to connect in the 
order 
-                    // split -> splitoutput -> successor without involving 
nodes.get(0)
-                    // in the above call we have connected
-                    // nodes.get(0) to split (we will set the flag
-                    // to true later in this loop iteration). Hence in 
subsequent 
-                    // iterations we will only disconnect nodes.get(0) from its
-                    // successor and connect the split-splitoutput chain
-                    // to the successor
-                } else {
-                    mPlan.disconnect(nodes.get(0), succ);                    
-                }
                 LogicalPlan condPlan = new LogicalPlan();
                 LOConst cnst = new LOConst(mPlan, new OperatorKey(scope, 
                         idGen.getNextNodeId(scope)), new Boolean(true));
@@ -95,22 +135,11 @@
                         new OperatorKey(scope, idGen.getNextNodeId(scope)), 
++index, condPlan);
                 splitOp.addOutput(splitOutput);
                 mPlan.add(splitOutput);
-                
-                if(!nodeConnectedToSplit) {
-                    // node.get(0) should be connected to Split (only once) and
-                    // split to splitoutput to successor - this is for the 
first successor  
-                    // for the next successor we just want to connect in the 
order 
-                    // split - splitoutput - successor.
-                    // the call below is in the first successor case
-                    mPlan.insertBetween(splitOp, splitOutput, succ);    
-                    nodeConnectedToSplit = true;
-                } else {
-                    mPlan.connect(splitOp, splitOutput);
-                    mPlan.connect(splitOutput, succ);
-                }
+                mPlan.insertBetween(splitOp, splitOutput, succ);
                 // Patch up the contained plans of succ
                 fixUpContainedPlans(nodes.get(0), splitOutput, succ, null);
             }
+            
         } catch (Exception e) {
             throw new OptimizerException(e);
         }

Modified: 
hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java 
Thu Nov 20 12:05:09 2008
@@ -50,7 +50,7 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.Pair;
-import org.apache.pig.test.util.Identity;
+import org.apache.pig.test.utils.Identity;
 
 import junit.framework.TestCase;
 

Modified: 
hadoop/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java 
Thu Nov 20 12:05:09 2008
@@ -1,9 +1,13 @@
 package org.apache.pig.test;
 
 
+import static 
org.apache.pig.test.utils.TypeCheckingTestUtil.printMessageCollector;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.printTypeGraph;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
+import java.util.HashMap;
 import java.util.Iterator;
 
 import junit.framework.TestCase;
@@ -11,6 +15,12 @@
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.PlanSetter;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.test.utils.LogicalPlanTester;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,4 +59,73 @@
         }
         assertEquals(20, cnt);
     }
+    
+    @Test
+    public void testImplicitSplitInCoGroup() throws Exception {
+        // this query is similar to the one reported in JIRA - PIG-537
+        // Create input file
+        File inputA = Util.createInputFile("tmp", "", 
+                new String[] {"a:1", "b:2", "b:20", "c:3", "c:30"});
+        File inputB = Util.createInputFile("tmp", "", 
+                new String[] {"a:first", "b:second", "c:third"});
+        pigServer.registerQuery("a = load 'file:" + 
Util.encodeEscape(inputA.toString()) + 
+                "' using PigStorage(':') as (name:chararray, marks:int);");
+        pigServer.registerQuery("b = load 'file:" + 
Util.encodeEscape(inputA.toString()) + 
+                "' using PigStorage(':') as (name:chararray, 
rank:chararray);");
+        pigServer.registerQuery("c = cogroup a by name, b by name;");
+        pigServer.registerQuery("d = foreach c generate group, 
FLATTEN(a.marks) as newmarks;");
+        pigServer.registerQuery("e = cogroup a by marks, d by newmarks;");
+        pigServer.registerQuery("f = foreach e generate group, flatten(a), 
flatten(d);");
+        HashMap<Integer, Object[]> results = new HashMap<Integer, Object[]>();
+        results.put(1, new Object[] { "a", 1, "a", 1 });
+        results.put(2, new Object[] { "b", 2, "b", 2 });
+        results.put(3, new Object[] { "c", 3, "c", 3 });
+        results.put(20, new Object[] { "b", 20, "b", 20 });
+        results.put(30, new Object[] { "c", 30, "c", 30 });
+        
+        Iterator<Tuple> it = pigServer.openIterator("f");
+        while(it.hasNext()) {
+            Tuple t = it.next();
+            System.err.println("Tuple:" + t);
+            Integer group = (Integer)t.get(0);
+            Object[] groupValues = results.get(group);
+            for(int i = 0; i < 4; i++) {
+                assertEquals(groupValues[i], t.get(i+1));    
+            }
+        }
+    }
+    
+    @Test
+    public void testImplicitSplitInCoGroup2() throws Exception {
+        // this query is similar to the one reported in JIRA - PIG-537
+        LogicalPlanTester planTester = new LogicalPlanTester();
+        planTester.buildPlan("a = load 'file1' using PigStorage(':') as 
(name:chararray, marks:int);");
+        planTester.buildPlan("b = load 'file2' using PigStorage(':') as 
(name:chararray, rank:chararray);");
+        planTester.buildPlan("c = cogroup a by name, b by name;");
+        planTester.buildPlan("d = foreach c generate group, FLATTEN(a.marks) 
as newmarks;");
+        planTester.buildPlan("e = cogroup a by marks, d by newmarks;");
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, 
flatten(a), flatten(d);");
+        
+        // Set the logical plan values correctly in all the operators
+        PlanSetter ps = new PlanSetter(plan);
+        ps.visit();
+        
+        // run through validator
+        CompilationMessageCollector collector = new 
CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;        
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        
+        if (collector.hasError()) {
+            throw new Exception("Error during type checking") ;
+        }
+
+        // this will run ImplicitSplitInserter
+        TestLogicalOptimizer.optimizePlan(plan);
+        
+        // get Schema of leaf and compare:
+        Schema expectedSchema = Util.getSchemaFromString("grp: 
int,A::username: chararray,A::marks: int,AB::group: chararray,AB::newmarks: 
int");
+        assertTrue(Schema.equals(expectedSchema, 
plan.getLeaves().get(0).getSchema(),false, true));
+    }
 }

Modified: 
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java 
(original)
+++ 
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java 
Thu Nov 20 12:05:09 2008
@@ -19,7 +19,6 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.ByteArrayInputStream;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -51,10 +50,9 @@
 import org.apache.pig.impl.logicalLayer.*;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
 import org.apache.pig.impl.logicalLayer.parser.ParseException ;
 import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.test.util.Identity;
+import org.apache.pig.test.utils.Identity;
 
 
 public class TestLogicalPlanBuilder extends junit.framework.TestCase {
@@ -1180,70 +1178,70 @@
         //the first element in group, i.e., name is renamed as myname
         lp = buildPlan("c = foreach b generate flatten(group) as (myname), 
COUNT(a) as mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: 
chararray, age: int, mycount: long")));
+        
assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("myname: 
chararray, age: int, mycount: long")));
 
         //the first and second elements in group, i.e., name and age are 
renamed as myname and myage
         lp = buildPlan("c = foreach b generate flatten(group) as (myname, 
myage), COUNT(a) as mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: 
chararray, myage: int, mycount: long")));
+        
assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("myname: 
chararray, myage: int, mycount: long")));
 
         //the schema of group is unchanged
         lp = buildPlan("c = foreach b generate flatten(group) as (), COUNT(a) 
as mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        
assertTrue(foreach.getSchema().equals(getSchemaFromString("group::name: 
chararray, group::age: int, mycount: long")));
+        
assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("group::name: 
chararray, group::age: int, mycount: long")));
 
         //the first element in group, i.e., name is renamed as myname 
         lp = buildPlan("c = foreach b generate flatten(group) as myname, 
COUNT(a) as mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: 
chararray, age: int, mycount: long")));
+        
assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("myname: 
chararray, age: int, mycount: long")));
 
         //group is renamed as mygroup
         lp = buildPlan("c = foreach b generate group as mygroup, COUNT(a) as 
mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        
assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(name: 
chararray, age: int), mycount: long")));
+        
assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(name: 
chararray, age: int), mycount: long")));
 
         //group is renamed as mygroup and the first element is renamed as 
myname
         lp = buildPlan("c = foreach b generate group as mygroup:(myname), 
COUNT(a) as mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        
assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(myname: 
chararray, age: int), mycount: long")));
+        
assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(myname:
 chararray, age: int), mycount: long")));
 
         //group is renamed as mygroup and the elements are renamed as myname 
and myage
         lp = buildPlan("c = foreach b generate group as mygroup:(myname, 
myage), COUNT(a) as mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        
assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(myname: 
chararray, myage: int), mycount: long")));
+        
assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(myname:
 chararray, myage: int), mycount: long")));
 
         //group is renamed to mygroup as the tuple schema is empty
         lp = buildPlan("c = foreach b generate group as mygroup:(), COUNT(a) 
as mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        
assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(name: 
chararray, age: int), mycount: long")));
+        
assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(name: 
chararray, age: int), mycount: long")));
 
         //setting the schema of flattened bag that has no schema with the user 
defined schema
         buildPlan("c = load 'another_file';");
         buildPlan("d = cogroup a by $0, c by $0;");
         lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as (x, y, 
z), COUNT(a) as mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(getSchemaFromString("x: 
bytearray, y: bytearray, z: bytearray, mycount: long")));
+        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: 
bytearray, y: bytearray, z: bytearray, mycount: long")));
 
         //setting the schema of flattened bag that has no schema with the user 
defined schema
         buildPlan("c = load 'another_file';");
         buildPlan("d = cogroup a by $0, c by $0;");
         lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as (x: int, 
y: float, z), COUNT(a) as mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(getSchemaFromString("x: int, y: 
float, z: bytearray, mycount: long")));
+        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: 
int, y: float, z: bytearray, mycount: long")));
 
         //setting the schema of flattened bag that has no schema with the user 
defined schema
         buildPlan("c = load 'another_file';");
         buildPlan("d = cogroup a by $0, c by $0;");
         lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as x, 
COUNT(a) as mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(getSchemaFromString("x: 
bytearray, mycount: long")));
+        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: 
bytearray, mycount: long")));
 
         //setting the schema of flattened bag that has no schema with the user 
defined schema
         buildPlan("c = load 'another_file';");
         buildPlan("d = cogroup a by $0, c by $0;");
         lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as x: int, 
COUNT(a) as mycount;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(getSchemaFromString("x: int, 
mycount: long")));
+        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: 
int, mycount: long")));
 
     }
 
@@ -1357,13 +1355,13 @@
         LogicalPlan lp = buildPlan("c = foreach b {d = order a by $1; generate 
flatten(d), MAX(a.age) as max_age;};");
         LOForEach foreach = (LOForEach) lp.getLeaves().get(0);
         LOCogroup cogroup = (LOCogroup) lp.getPredecessors(foreach).get(0);
-        Schema.FieldSchema bagFs = new Schema.FieldSchema("a", 
getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray"), 
DataType.BAG);
+        Schema.FieldSchema bagFs = new Schema.FieldSchema("a", 
Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray"), 
DataType.BAG);
         Schema.FieldSchema groupFs = new Schema.FieldSchema("group", 
DataType.BYTEARRAY);
         Schema cogroupExpectedSchema = new Schema();
         cogroupExpectedSchema.add(groupFs);
         cogroupExpectedSchema.add(bagFs);
         assertTrue(Schema.equals(cogroup.getSchema(), cogroupExpectedSchema, 
false, false));
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray, max_age: 
double"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray, 
max_age: double"), false, true));
     }
 
     @Test
@@ -1415,23 +1413,23 @@
 
         lp = buildPlan("b = foreach a generate 1;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("x: 
int"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("x: int"), false, true));
 
         lp = buildPlan("b = foreach a generate 1L;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("x: 
long"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("x: long"), false, true));
 
         lp = buildPlan("b = foreach a generate 1.0;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("x: 
double"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("x: double"), false, true));
 
         lp = buildPlan("b = foreach a generate 1.0f;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("x: 
float"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("x: float"), false, true));
 
         lp = buildPlan("b = foreach a generate 'hello';");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("x: 
chararray"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("x: chararray"), false, true));
     }
 
     @Test
@@ -1443,31 +1441,31 @@
 
         lp = buildPlan("b = foreach a generate (1);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("t:(x: int)"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("t:(x: int)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1L);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("t:(x: long)"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("t:(x: long)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1.0);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("t:(x: double)"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("t:(x: double)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1.0f);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("t:(x: float)"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("t:(x: float)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello');");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("t:(x: chararray)"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("t:(x: chararray)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello', 1, 1L, 1.0f, 1.0);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("t:(x: chararray, y: int, z: long, a: float, b: double)"), 
false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("t:(x: chararray, y: int, z: long, a: float, b: 
double)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello', {(1), (1.0)});");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("t:(x: chararray, ib:{it:(d: double)})"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("t:(x: chararray, ib:{it:(d: double)})"), false, 
true));
 
     }
 
@@ -1480,39 +1478,39 @@
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (2, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("b:{t:(x: int, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("b:{t:(x: int, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (1L, 
'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("b:{t:(x: long, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("b:{t:(x: long, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (1.0f, 
'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (1.0, 
'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1L, 'hello'), (1.0f, 
'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1L, 'hello'), (1.0, 
'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1.0f, 'hello'), (1.0, 
'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1.0, 'hello'), (1.0f, 
'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1.0, 'hello', 3.14), (1.0f, 
'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("b:{t:()}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("b:{t:()}"), false, true));
 
     }
 
@@ -1575,7 +1573,7 @@
 
         lp = buildPlan("b = foreach a generate *;");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), 
getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray"), false, 
true));
+        assertTrue(Schema.equals(foreach.getSchema(), 
Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray"), 
false, true));
 
     }
 
@@ -1603,8 +1601,8 @@
 
         lp = buildPlan("b = group a by *;");
         cogroup = (LOCogroup) lp.getLeaves().get(0);
-        Schema groupSchema = getSchemaFromString("name: bytearray, age: 
bytearray, gpa: bytearray");
-        Schema bagASchema = getSchemaFromString("name: bytearray, age: 
bytearray, gpa: bytearray");
+        Schema groupSchema = Util.getSchemaFromString("name: bytearray, age: 
bytearray, gpa: bytearray");
+        Schema bagASchema = Util.getSchemaFromString("name: bytearray, age: 
bytearray, gpa: bytearray");
         Schema.FieldSchema groupFs = new Schema.FieldSchema("group", 
groupSchema, DataType.TUPLE);
         Schema.FieldSchema bagAFs = new Schema.FieldSchema("a", bagASchema, 
DataType.BAG);
         Schema expectedSchema = new Schema(groupFs);
@@ -1623,9 +1621,9 @@
 
         lp = buildPlan("c = group a by *, b by *;");
         cogroup = (LOCogroup) lp.getLeaves().get(0);
-        Schema groupSchema = getSchemaFromString("name: bytearray, age: 
bytearray, gpa: bytearray");
-        Schema bagASchema = getSchemaFromString("name: bytearray, age: 
bytearray, gpa: bytearray");
-        Schema bagBSchema = getSchemaFromString("first_name: bytearray, 
enrol_age: bytearray, high_school_gpa: bytearray");
+        Schema groupSchema = Util.getSchemaFromString("name: bytearray, age: 
bytearray, gpa: bytearray");
+        Schema bagASchema = Util.getSchemaFromString("name: bytearray, age: 
bytearray, gpa: bytearray");
+        Schema bagBSchema = Util.getSchemaFromString("first_name: bytearray, 
enrol_age: bytearray, high_school_gpa: bytearray");
         Schema.FieldSchema groupFs = new Schema.FieldSchema("group", 
groupSchema, DataType.TUPLE);
         Schema.FieldSchema bagAFs = new Schema.FieldSchema("a", bagASchema, 
DataType.BAG);
         Schema.FieldSchema bagBFs = new Schema.FieldSchema("b", bagBSchema, 
DataType.BAG);
@@ -1794,18 +1792,6 @@
 
     }
 
-    private Schema getSchemaFromString(String schemaString) throws 
ParseException {
-        return getSchemaFromString(schemaString, DataType.BYTEARRAY);
-    }
-
-    private Schema getSchemaFromString(String schemaString, byte defaultType) 
throws ParseException {
-        ByteArrayInputStream stream = new 
ByteArrayInputStream(schemaString.getBytes()) ;
-        QueryParser queryParser = new QueryParser(stream) ;
-        Schema schema = queryParser.TupleSchema() ;
-        Schema.setSchemaDefaultType(schema, defaultType);
-        return schema;
-    }
-
     private void printPlan(LogicalPlan lp) {
         LOPrinter graphPrinter = new LOPrinter(System.err, lp);
         System.err.println("Printing the logical plan");

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/Util.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/Util.java Thu Nov 20 
12:05:09 2008
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.test;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.OutputStreamWriter;
@@ -30,6 +31,9 @@
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.parser.QueryParser;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 public class Util {
     private static BagFactory mBagFactory = BagFactory.getInstance();
@@ -236,4 +240,16 @@
             return "file:/"+encodeEscape(path);
         return "file:"+path;
     }
+
+    public static Schema getSchemaFromString(String schemaString) throws 
ParseException {
+        return Util.getSchemaFromString(schemaString, DataType.BYTEARRAY);
+    }
+
+    static Schema getSchemaFromString(String schemaString, byte defaultType) 
throws ParseException {
+        ByteArrayInputStream stream = new 
ByteArrayInputStream(schemaString.getBytes()) ;
+        QueryParser queryParser = new QueryParser(stream) ;
+        Schema schema = queryParser.TupleSchema() ;
+        Schema.setSchemaDefaultType(schema, defaultType);
+        return schema;
+    }
 }

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/utils/Identity.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/utils/Identity.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/utils/Identity.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/utils/Identity.java Thu 
Nov 20 12:05:09 2008
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.test.util;
+package org.apache.pig.test.utils;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.data.Tuple;


Reply via email to