Author: olga
Date: Mon Dec  8 17:59:58 2008
New Revision: 724576

URL: http://svn.apache.org/viewvc?rev=724576&view=rev
Log:
PIG-449: Schemas for bags should contain tuples all the time

Added:
    hadoop/pig/branches/types/test/org/apache/pig/test/TestDataBagAccess.java
Modified:
    hadoop/pig/branches/types/CHANGES.txt
    hadoop/pig/branches/types/src/org/apache/pig/data/DataType.java
    
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
    
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestPigServer.java

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Mon Dec  8 17:59:58 2008
@@ -324,7 +324,10 @@
 
     PIG-538: support for null constants (pradeepk via olgan)
 
-    PIG-385: more null handling (pradeepl via olgan)
+    PIG-385: more null handling (pradeepk via olgan)
 
     PIG-546: FilterFunc calls empty constructor when it should be calling
-    parameterized constructor
+    parameterized constructor (sms via olgan)
+
+    PIG-449: Schemas for bags should contain tuples all the time (pradeepk via
+    olgan)

Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DataType.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DataType.java Mon Dec  8 
17:59:58 2008
@@ -795,6 +795,10 @@
                 }
                 Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, 
schema, TUPLE);
                 Schema bagSchema = new Schema(tupleFs);
+                // since this schema has tuple field schema which internally
+                // has a list of field schemas for the actual items in the bag
+                // an access to any field in the bag is a  two level access
+                bagSchema.setTwoLevelAccessRequired(true);
                 return new Schema.FieldSchema(null, bagSchema, BAG);
             }
         default: {

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java 
(original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java 
Mon Dec  8 17:59:58 2008
@@ -201,6 +201,30 @@
                                                        //i.e., flatten(A), 
flatten(A.x) and NOT
                                                        //flatten(B.(x,y,z))
                                                        Schema s = 
planFs.schema;
+                                                       if(null != s && 
s.isTwoLevelAccessRequired()) {
+                                                           // this is the case 
where the schema is that of
+                                                   // a bag which has just one 
tuple fieldschema which
+                                                   // in turn has a list of 
fieldschemas. The schema
+                                                           // after flattening 
would consist of the fieldSchemas
+                                                           // present in the 
tuple
+                                                   
+                                                   // check that indeed we 
only have one field schema
+                                                   // which is that of a tuple
+                                                   if(s.getFields().size() != 
1) {
+                                                       throw new 
FrontendException("Expected a bag schema with a single " +
+                                                               "element of 
type "+ DataType.findTypeName(DataType.TUPLE) +
+                                                               " but got a bag 
schema with multiple elements.");
+                                                   }
+                                                   Schema.FieldSchema tupleFS 
= s.getField(0);
+                                                   if(tupleFS.type != 
DataType.TUPLE) {
+                                                       throw new 
FrontendException("Expected a bag schema with a single " +
+                                                               "element of 
type "+ DataType.findTypeName(DataType.TUPLE) +
+                                                               " but got an 
element of type " +
+                                                               
DataType.findTypeName(tupleFS.type));
+                                                   }
+                                                   s = tupleFS.schema;
+                                                           
+                                                       }
                                                        if(null != s) {
                                                                for(int i = 0; 
i < s.size(); ++i) {
                                     Schema.FieldSchema fs;
@@ -330,6 +354,8 @@
                     mSchema = null;
                     mIsSchemaComputed = false;
                     throw fee;
+                } catch (ParseException e) {
+                    throw new FrontendException(e);
                 }
             }
                        //check for duplicate column names and throw an error 
if there are duplicates

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java 
(original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java 
Mon Dec  8 17:59:58 2008
@@ -234,7 +234,36 @@
                                 if(null != expOpFs) {
                                     Schema s = expOpFs.schema;
                                     if(null != s) {
-                                        Schema.FieldSchema fs = 
s.getField(mProjection.get(0));
+                                        Schema.FieldSchema fs;
+                                        if(s.isTwoLevelAccessRequired()) {
+                                            // this is the case where the 
schema is that of
+                                            // a bag which has just one tuple 
fieldschema which
+                                            // in turn has a list of 
fieldschemas. So the field
+                                            // schema we are trying to 
construct would be of the
+                                            // item we are trying to project 
inside the tuple 
+                                            // fieldschema - because currently 
when we say b.i where
+                                            // b is a bag, we are trying to 
access the item i
+                                            // present in the tuple in the bag.
+                                            
+                                            // check that indeed we only have 
one field schema
+                                            // which is that of a tuple
+                                            if(s.getFields().size() != 1) {
+                                                throw new 
FrontendException("Expected a bag schema with a single " +
+                                                        "element of type "+ 
DataType.findTypeName(DataType.TUPLE) +
+                                                        " but got a bag schema 
with multiple elements.");
+                                            }
+                                            Schema.FieldSchema tupleFS = 
s.getField(0);
+                                            if(tupleFS.type != DataType.TUPLE) 
{
+                                                throw new 
FrontendException("Expected a bag schema with a single " +
+                                                        "element of type "+ 
DataType.findTypeName(DataType.TUPLE) +
+                                                        " but got an element 
of type " +
+                                                        
DataType.findTypeName(tupleFS.type));
+                                            }
+                                            fs = 
tupleFS.schema.getField(mProjection.get(0));
+                                        } else {
+                                            // normal single level access
+                                            fs = 
s.getField(mProjection.get(0));
+                                        }
                                         mFieldSchema = new 
Schema.FieldSchema(fs);
                                         
mFieldSchema.setParent(fs.canonicalName, expressionOperator);
                                     } else {

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 Mon Dec  8 17:59:58 2008
@@ -2858,6 +2858,10 @@
        ( t1 = <IDENTIFIER> ) [LOOKAHEAD(2) ":" <BAG> | ":"] "{" (fs = 
SchemaTuple() | {} {fs = new Schema.FieldSchema(null, new Schema());}) "}" 
        {
         s = new Schema(fs);
+        // since this schema has tuple field schema which internally
+        // has a list of field schemas for the actual items in the bag
+        // an access to any field in the bag is a  two level access
+        s.setTwoLevelAccessRequired(true);
                if (null != t1) {
                        log.debug("BAG alias " + t1.image);
                        fs = new Schema.FieldSchema(t1.image, s, DataType.BAG);

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 Mon Dec  8 17:59:58 2008
@@ -556,6 +556,28 @@
     private Map<String, FieldSchema> mAliases;
     private MultiMap<String, String> mFieldSchemas;
     private static Log log = LogFactory.getLog(Schema.class);
+    // In bags which have a schema with a tuple which contains
+    // the fields present in it, if we access the second field (say)
+    // we are actually trying to access the second field in the
+    // tuple in the bag. This is currently true for two cases:
+    // 1) bag constants - the schema of bag constant has a tuple
+    // which internally has the actual elements
+    // 2) When bags are loaded from input data, if the user 
+    // specifies a schema with the "bag" type, he has to specify
+    // the bag as containing a tuple with the actual elements in 
+    // the schema declaration. However in both the cases above,
+    // the user can still say b.i where b is the bag and i is 
+    // an element in the bag's tuple schema. So in these cases,
+    // the access should translate to a lookup for "i" in the 
+    // tuple schema present in the bag. To indicate this, the
+    // flag below is used. It is false by default because, 
+    // currently we use bag as the type for relations. However 
+    // the schema of a relation does NOT have a tuple fieldschema
+    // with items in it. Instead, the schema directly has the 
+    // field schema of the items. So for a relation "b", the 
+    // above b.i access would be a direct single level access
+    // of i in b's schema. This is treated as the "default" case
+    private boolean twoLevelAccessRequired = false;
 
     public Schema() {
         mFields = new ArrayList<FieldSchema>();
@@ -604,6 +626,7 @@
     public Schema(Schema s) {
 
         if(null != s) {
+            twoLevelAccessRequired = s.twoLevelAccessRequired;
             mFields = new ArrayList<FieldSchema>(s.size());
             mAliases = new HashMap<String, FieldSchema>();
             mFieldSchemas = new MultiMap<String, String>();
@@ -840,6 +863,7 @@
             s.mFieldSchemas.put(newFs.canonicalName, 
mFieldSchemas.get(oldFsCanonicalName));
         }
 
+        s.twoLevelAccessRequired = twoLevelAccessRequired;
         return s;
     }
 
@@ -962,23 +986,57 @@
      * @return position of the FieldSchema.
      */
     public int getPosition(String alias) throws FrontendException{
-
-        FieldSchema fs = getField(alias);
-
-        if (null == fs) {
-            return -1;
-        }
-
-        log.debug("fs: " + fs);
-        int index = -1;
-        for(int i = 0; i < mFields.size(); ++i) {
-            log.debug("mFields(" + i + "): " + mFields.get(i) + " alias: " + 
mFields.get(i).alias);
-            if(fs == mFields.get(i)) {index = i;}
+        if(twoLevelAccessRequired) {
+            // this is the case where "this" schema is that of
+            // a bag which has just one tuple fieldschema which
+            // in turn has a list of fieldschemas. The alias supplied
+            // should be treated as an alias in the tuple's schema
+            
+            // check that indeed we only have one field schema
+            // which is that of a tuple
+            if(mFields.size() != 1) {
+                throw new FrontendException("Expected a bag schema with a 
single " +
+                        "element of type "+ 
DataType.findTypeName(DataType.TUPLE) +
+                        " but got a bag schema with multiple elements.");
+            }
+            Schema.FieldSchema tupleFS = mFields.get(0);
+            if(tupleFS.type != DataType.TUPLE) {
+                throw new FrontendException("Expected a bag schema with a 
single " +
+                               "element of type "+ 
DataType.findTypeName(DataType.TUPLE) +
+                               " but got an element of type " +
+                               DataType.findTypeName(tupleFS.type));
+            }
+            
+            // check if the alias supplied is that of the tuple 
+            // itself - then disallow it since we do not allow access
+            // to the tuple itself - we only allow access to the fields
+            // in the tuple
+            if(alias.equals(tupleFS.alias)) {
+                throw new FrontendException("Access to the tuple ("+ alias + 
") of " +
+                               "the bag is disallowed. Only access to the 
elements of " +
+                               "the tuple in the bag is allowed.");
+            }
+            
+            // all is good - get the position from the tuple's schema
+            return tupleFS.schema.getPosition(alias);
+        } else {
+            FieldSchema fs = getField(alias);
+    
+            if (null == fs) {
+                return -1;
+            }
+    
+            log.debug("fs: " + fs);
+            int index = -1;
+            for(int i = 0; i < mFields.size(); ++i) {
+                log.debug("mFields(" + i + "): " + mFields.get(i) + " alias: " 
+ mFields.get(i).alias);
+                if(fs == mFields.get(i)) {index = i;}
+            }
+    
+            log.debug("index: " + index);
+            return index;
+            //return mFields.indexOf(fs);
         }
-
-        log.debug("index: " + index);
-        return index;
-        //return mFields.indexOf(fs);
     }
 
     public void addAlias(String alias, FieldSchema fs) {
@@ -1435,7 +1493,9 @@
             }
         }
 
-        return new Schema(outputList) ;
+        Schema s = new Schema(outputList) ;
+        s.setTwoLevelAccessRequired(other.twoLevelAccessRequired);
+        return s;
     }
 
     /**
@@ -1449,6 +1509,20 @@
             FieldSchema.setFieldSchemaDefaultType(fs, t);
         }
     }
+
+    /**
+     * @return the twoLevelAccess
+     */
+    public boolean isTwoLevelAccessRequired() {
+        return twoLevelAccessRequired;
+    }
+
+    /**
+     * @param twoLevelAccess the twoLevelAccess to set
+     */
+    public void setTwoLevelAccessRequired(boolean twoLevelAccess) {
+        this.twoLevelAccessRequired = twoLevelAccess;
+    }
     
 }
 

Added: hadoop/pig/branches/types/test/org/apache/pig/test/TestDataBagAccess.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestDataBagAccess.java?rev=724576&view=auto
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestDataBagAccess.java 
(added)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestDataBagAccess.java 
Mon Dec  8 17:59:58 2008
@@ -0,0 +1,271 @@
+/**
+ * 
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.MultiMap;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ *
+ */
+public class TestDataBagAccess extends TestCase {
+
+    MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pigServer;
+
+    @Before
+    @Override
+    public void setUp() throws Exception{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+    
+    @Test
+    public void testBagConstantAccess() throws IOException, ExecException {
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"sampledata\tnot_used"});
+        pigServer.registerQuery("a = load 'file:" + 
Util.encodeEscape(input.toString()) + "';");
+        pigServer.registerQuery("b = foreach a generate {(16, 4.0e-2, 
'hello')} as mybag:{t:(i: int, d: double, c: chararray)};");
+        pigServer.registerQuery("c = foreach b generate mybag.i, mybag.d, 
mybag.c;");
+        Iterator<Tuple> it = pigServer.openIterator("c");
+        Tuple t = it.next();
+        Object[] results = new Object[] { new Integer(16), new Double(4.0e-2), 
"hello" };
+        Class[] resultClasses = new Class[] { Integer.class, Double.class, 
String.class };
+        assertEquals(results.length, t.size());
+        for (int i = 0; i < results.length; i++) {
+            DataBag bag = (DataBag)t.get(i);
+            assertEquals(results[i], bag.iterator().next().get(0));
+            assertEquals(resultClasses[i], 
bag.iterator().next().get(0).getClass());
+        }
+    }
+    
+    @Test
+    public void testBagConstantAccessFailure() throws IOException, 
ExecException {
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"sampledata\tnot_used"});
+        pigServer.registerQuery("a = load 'file:" + 
Util.encodeEscape(input.toString()) + "';");
+        pigServer.registerQuery("b = foreach a generate {(16, 4.0e-2, 
'hello')} as mybag:{t:(i: int, d: double, c: chararray)};");
+        boolean exceptionOccured = false;
+        try {
+            pigServer.registerQuery("c = foreach b generate mybag.t;");
+        } catch(IOException e) {
+            exceptionOccured = true;
+            String msg = e.getMessage();
+            assertTrue(msg.contains("Only access to the elements of " +
+                    "the tuple in the bag is allowed."));
+        }
+        assertTrue(exceptionOccured);
+    }
+    
+    @Test
+    public void testBagConstantFlatten1() throws IOException, ExecException {
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"sampledata\tnot_used"});
+        pigServer.registerQuery("A = load 'file:" + 
Util.encodeEscape(input.toString()) + "';");
+        pigServer.registerQuery("B = foreach A generate {(('p1-t1-e1', 
'p1-t1-e2'),('p1-t2-e1', 'p1-t2-e2'))," +
+                "(('p2-t1-e1', 'p2-t1-e2'), ('p2-t2-e1', 'p2-t2-e2'))};");
+        pigServer.registerQuery("C = foreach B generate $0 as pairbag { pair: 
( t1: (e1, e2), t2: (e1, e2) ) };");
+        pigServer.registerQuery("D = foreach C generate FLATTEN(pairbag);");
+        pigServer.registerQuery("E = foreach D generate t1.e2 as t1e2, t2.e1 
as t2e1;");
+        Iterator<Tuple> it = pigServer.openIterator("E");
+        // We should get the following two tuples as the result:
+        // (p1-t1-e2,p1-t2-e1)
+        // (p2-t1-e2,p2-t2-e1)
+        Tuple t = it.next();
+        assertEquals("p1-t1-e2", (String)t.get(0));
+        assertEquals("p1-t2-e1", (String)t.get(1));
+        t = it.next();
+        assertEquals("p2-t1-e2", (String)t.get(0));
+        assertEquals("p2-t2-e1", (String)t.get(1));
+        assertFalse(it.hasNext());
+    }
+    
+    @Test
+    public void testBagConstantFlatten2() throws IOException, ExecException {
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"somestring\t10\t{(a,10),(b,20)}"});
+        pigServer.registerQuery("a = load 'file:" + 
Util.encodeEscape(input.toString()) + "' " +
+                       "as (str:chararray, intval:int, 
bg:bag{t:tuple(s:chararray, i:int)});");
+        pigServer.registerQuery("b = foreach a generate str, intval, 
flatten(bg);");
+        pigServer.registerQuery("c = foreach b generate str, intval, s, i;");
+        Iterator<Tuple> it = pigServer.openIterator("c");
+        int i = 0;
+        Object[][] results = new Object[][] { {"somestring", new Integer(10), 
"a", new Integer(10)},
+                {"somestring", new Integer(10), "b", new Integer(20) }};
+        Class[] resultClasses = new Class[] { String.class, Integer.class, 
String.class, Integer.class };
+        while(it.hasNext()) {
+            Tuple t = it.next();
+            for (int j = 0; j < resultClasses.length; j++) {
+                assertEquals(results[i][j], t.get(j));
+                assertEquals(resultClasses[j], t.get(j).getClass());
+            }
+            i++;
+        }
+        assertEquals(results.length, i);
+        
+        pigServer.registerQuery("c = foreach b generate str, intval, bg::s, 
bg::i;");
+        it = pigServer.openIterator("c");
+        i = 0;
+        while(it.hasNext()) {
+            Tuple t = it.next();
+            for (int j = 0; j < resultClasses.length; j++) {
+                assertEquals(results[i][j], t.get(j));
+                assertEquals(resultClasses[j], t.get(j).getClass());
+            }
+            i++;
+        }
+        assertEquals(results.length, i);
+    }
+
+    @Test
+    public void testBagStoreLoad() throws IOException, ExecException {
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"a\tid1", "a\tid2", "a\tid3", "b\tid4", 
"b\tid5", "b\tid6"});
+        pigServer.registerQuery("a = load 'file:" + 
Util.encodeEscape(input.toString()) + "' " +
+                "as (s:chararray, id:chararray);");
+        pigServer.registerQuery("b = group a by s;");
+        Class[] loadStoreClasses = new Class[] { BinStorage.class, 
PigStorage.class };
+        for (int i = 0; i < loadStoreClasses.length; i++) {
+            String output = "/pig/out/TestDataBagAccess-testBagStoreLoad-" + 
+                             loadStoreClasses[i].getName() + ".txt";
+            pigServer.deleteFile(output);
+            pigServer.store("b", output, loadStoreClasses[i].getName());
+            pigServer.registerQuery("c = load '" + output + "' using " + 
loadStoreClasses[i].getName() + "() AS " +
+                    "(gp: chararray, bg:bag { t: tuple (sReLoaded: chararray, 
idReLoaded: chararray)});;");
+            Iterator<Tuple> it = pigServer.openIterator("c");
+            MultiMap<Object, Object> results = new MultiMap<Object, Object>();
+            results.put("a", "id1");
+            results.put("a", "id2");
+            results.put("a", "id3");
+            results.put("b", "id4");
+            results.put("b", "id5");
+            results.put("b", "id6");
+            int j = 0;
+            while(it.hasNext()) {
+                Tuple t = it.next();
+                Object groupKey = t.get(0);
+                DataBag groupBag = (DataBag)t.get(1);
+                Iterator<Tuple> bgIt = groupBag.iterator();
+                int k = 0;
+                while(bgIt.hasNext()) {
+                    // a hash to make sure we don't see the
+                    // same "ids" twice
+                    HashMap<Object, Boolean> seen = new HashMap<Object, 
Boolean>();
+                    Tuple bgt = bgIt.next();
+                    // the first col is the group by key
+                    assertTrue(bgt.get(0).equals(groupKey));
+                    Collection<Object> values = results.get(groupKey);
+                    // check that the second column is one
+                    // of the "id" values associated with this
+                    // group by key
+                    assertTrue(values.contains(bgt.get(1)));
+                    // check that we have not seen the same "id" value
+                    // before
+                    if(seen.containsKey(bgt.get(1)))
+                        fail("LoadStoreClass used : " + 
loadStoreClasses[i].getName() + " " +
+                                       ", duplicate value (" + bgt.get(1) + 
")");
+                    else
+                        seen.put(bgt.get(1), true);
+                    k++;
+                }
+                // check that we saw 3 tuples in each group bag
+                assertEquals(3, k);
+                j++;
+            }
+            // make sure we saw the right number of high
+            // level tuples
+            assertEquals(results.keySet().size(), j);
+            
+            pigServer.registerQuery("d = foreach c generate gp, flatten(bg);");
+            // results should be
+            // a a id1
+            // a a id2
+            // a a id3
+            // b b id4
+            // b b id5
+            // b b id6
+            // However order is not guaranteed
+            List<Tuple> resultTuples = new ArrayList<Tuple>();
+            resultTuples.add(Util.createTuple(new String[] { "a", "a", 
"id1"}));
+            resultTuples.add(Util.createTuple(new String[] { "a", "a", 
"id2"}));
+            resultTuples.add(Util.createTuple(new String[] { "a", "a", 
"id3"}));
+            resultTuples.add(Util.createTuple(new String[] { "b", "b", 
"id4"}));
+            resultTuples.add(Util.createTuple(new String[] { "b", "b", 
"id5"}));
+            resultTuples.add(Util.createTuple(new String[] { "b", "b", 
"id6"}));
+            it = pigServer.openIterator("d");
+            j = 0;
+            HashMap<Tuple, Boolean> seen = new HashMap<Tuple, Boolean>();
+            while(it.hasNext()) {
+                Tuple t = it.next();
+                assertTrue(resultTuples.contains(t));
+                if(seen.containsKey(t)) {
+                    fail("LoadStoreClass used : " + 
loadStoreClasses[i].getName() + " " +
+                                ", duplicate tuple (" + t + ") encountered.");
+                } else {
+                    seen.put(t, true);
+                }
+                j++;
+            }
+            // check we got expected number of tuples
+            assertEquals(resultTuples.size(), j);
+            
+            // same output as above - but projection based on aliases
+            pigServer.registerQuery("e = foreach d generate gp, sReLoaded, 
idReLoaded;");
+            it = pigServer.openIterator("e");
+            j = 0;
+            seen = new HashMap<Tuple, Boolean>();
+            while(it.hasNext()) {
+                Tuple t = it.next();
+                assertTrue(resultTuples.contains(t));
+                if(seen.containsKey(t)) {
+                    fail("LoadStoreClass used : " + 
loadStoreClasses[i].getName() + " " +
+                                ", duplicate tuple (" + t + ") encountered.");
+                } else {
+                    seen.put(t, true);
+                }
+                j++;
+            }
+            // check we got expected number of tuples
+            assertEquals(resultTuples.size(), j);
+
+            // same result as above but projection based on position specifiers
+            pigServer.registerQuery("f = foreach d generate $0, $1, $2;");
+            it = pigServer.openIterator("f");
+            j = 0;
+            seen = new HashMap<Tuple, Boolean>();
+            while(it.hasNext()) {
+                Tuple t = it.next();
+                assertTrue(resultTuples.contains(t));
+                if(seen.containsKey(t)) {
+                    fail("LoadStoreClass used : " + 
loadStoreClasses[i].getName() + " " +
+                                ", duplicate tuple (" + t + ") encountered.");
+                } else {
+                    seen.put(t, true);
+                }
+                j++;
+            }
+            // check we got expected number of tuples
+            assertEquals(resultTuples.size(), j);
+
+        
+        }
+    }
+}

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestPigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestPigServer.java?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestPigServer.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestPigServer.java Mon 
Dec  8 17:59:58 2008
@@ -38,6 +38,7 @@
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 import org.junit.Before;
 import org.junit.After;
@@ -551,7 +552,17 @@
         InputStream fileWithStdOutContents = new DataInputStream( new 
BufferedInputStream( new FileInputStream(stdOutRedirectedFile)));
         BufferedReader reader = new BufferedReader(new 
InputStreamReader(fileWithStdOutContents));
         while ((s = reader.readLine()) != null) {
-            assertTrue(s.equals("b: {site: chararray,count: 
int,itemCounts::itemCountsTuple: (type: chararray,typeCount: int,f: float,m: 
map[ ])}") == true);
+            // strip away the initial schema alias and the
+            // curlies surrounding the schema to construct
+            // the schema object from the schema string
+            s = s.replaceAll("^.*\\{", "");
+            s = s.replaceAll("\\}$", "");
+            Schema actual = Util.getSchemaFromString( s);
+            Schema expected = Util.getSchemaFromString(
+                    "site: chararray,count: int," +
+                    "itemCounts::type: chararray,itemCounts::typeCount: int," +
+                    "itemCounts::f: float,itemCounts::m: map[ ]");
+            assertEquals(expected, actual);
         }
 
     }


Reply via email to