Author: olga
Date: Thu Oct  2 19:33:29 2008
New Revision: 701281

URL: http://svn.apache.org/viewvc?rev=701281&view=rev
Log:
PIG-464: bag schema definition

Modified:
    incubator/pig/branches/types/CHANGES.txt
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    incubator/pig/branches/types/src/org/apache/pig/impl/util/MultiMap.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=701281&r1=701280&r2=701281&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Thu Oct  2 19:33:29 2008
@@ -269,3 +269,5 @@
     PIG-470: TextLoader should produce bytearrays (sms via olgan)
 
     PIG-335: lineage (sms vi olgan)
+
+    PIG-464: bag schema definition (pradeepk via olgan)

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=701281&r1=701280&r2=701281&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 Thu Oct  2 19:33:29 2008
@@ -405,7 +405,7 @@
             try {
                 FieldSchema fs = new FieldSchema(alias,
                     (schema == null ? null : schema.clone()), type);
-                fs.canonicalName = canonicalName;
+                fs.canonicalName = canonicalNamer.getNewName();
                 if (canonicalMap != null) {
                     fs.canonicalMap =
                         new HashMap<String, LogicalOperator>(canonicalMap);
@@ -498,13 +498,13 @@
 
     private List<FieldSchema> mFields;
     private Map<String, FieldSchema> mAliases;
-    private MultiMap<FieldSchema, String> mFieldSchemas;
+    private MultiMap<String, String> mFieldSchemas;
     private static Log log = LogFactory.getLog(Schema.class);
 
     public Schema() {
         mFields = new ArrayList<FieldSchema>();
         mAliases = new HashMap<String, FieldSchema>();
-        mFieldSchemas = new MultiMap<FieldSchema, String>();
+        mFieldSchemas = new MultiMap<String, String>();
     }
 
     /**
@@ -513,12 +513,12 @@
     public Schema(List<FieldSchema> fields) {
         mFields = fields;
         mAliases = new HashMap<String, FieldSchema>(fields.size());
-        mFieldSchemas = new MultiMap<FieldSchema, String>();
+        mFieldSchemas = new MultiMap<String, String>();
         for (FieldSchema fs : fields) {
             if (fs.alias != null) {
                 mAliases.put(fs.alias, fs);
                 if(null != fs) {
-                    mFieldSchemas.put(fs, fs.alias);
+                    mFieldSchemas.put(fs.canonicalName, fs.alias);
                 }
             }
         }
@@ -532,11 +532,11 @@
         mFields = new ArrayList<FieldSchema>(1);
         mFields.add(fieldSchema);
         mAliases = new HashMap<String, FieldSchema>(1);
-        mFieldSchemas = new MultiMap<FieldSchema, String>();
+        mFieldSchemas = new MultiMap<String, String>();
         if (fieldSchema.alias != null) {
             mAliases.put(fieldSchema.alias, fieldSchema);
             if(null != fieldSchema) {
-                mFieldSchemas.put(fieldSchema, fieldSchema.alias);
+                mFieldSchemas.put(fieldSchema.canonicalName, 
fieldSchema.alias);
             }
         }
     }
@@ -550,7 +550,7 @@
         if(null != s) {
             mFields = new ArrayList<FieldSchema>(s.size());
             mAliases = new HashMap<String, FieldSchema>();
-            mFieldSchemas = new MultiMap<FieldSchema, String>();
+            mFieldSchemas = new MultiMap<String, String>();
             try {
                 for (int i = 0; i < s.size(); ++i) {
                     FieldSchema fs = new FieldSchema(s.getField(i));
@@ -559,7 +559,7 @@
                         if (fs.alias != null) {
                             mAliases.put(fs.alias, fs);
                             if(null != fs) {
-                                mFieldSchemas.put(fs, fs.alias);
+                                mFieldSchemas.put(fs.canonicalName, fs.alias);
                             }
                         }
                     }
@@ -567,12 +567,12 @@
             } catch (ParseException pe) {
                 mFields = new ArrayList<FieldSchema>();
                 mAliases = new HashMap<String, FieldSchema>();
-                mFieldSchemas = new MultiMap<FieldSchema, String>();
+                mFieldSchemas = new MultiMap<String, String>();
             }
         } else {
             mFields = new ArrayList<FieldSchema>();
             mAliases = new HashMap<String, FieldSchema>();
-            mFieldSchemas = new MultiMap<FieldSchema, String>();
+            mFieldSchemas = new MultiMap<String, String>();
         }
     }
 
@@ -699,7 +699,7 @@
                     if (ourFs.alias != null) {
                         log.debug("Removing ourFs.alias: " + ourFs.alias);
                         mAliases.remove(ourFs.alias);
-                        Collection<String> aliases = mFieldSchemas.get(ourFs);
+                        Collection<String> aliases = 
mFieldSchemas.get(ourFs.canonicalName);
                         if (aliases != null) {
                             List<String> listAliases = new ArrayList<String>();
                             for(String alias: aliases) {
@@ -707,7 +707,7 @@
                             }
                             for(String alias: listAliases) {
                                 log.debug("Removing alias " + alias + " from 
multimap");
-                                mFieldSchemas.remove(ourFs, alias);
+                                mFieldSchemas.remove(ourFs.canonicalName, 
alias);
                             }
                         }
                     }
@@ -715,7 +715,7 @@
                     log.debug("Setting alias to: " + otherFs.alias);
                     mAliases.put(ourFs.alias, ourFs);
                     if(null != ourFs.alias) {
-                        mFieldSchemas.put(ourFs, ourFs.alias);
+                        mFieldSchemas.put(ourFs.canonicalName, ourFs.alias);
                     }
                 }
                 if (otherFs.type != DataType.UNKNOWN) {
@@ -761,10 +761,13 @@
         // list with copies of the existing field schemas.
         Map<FieldSchema, FieldSchema> fsMap =
             new HashMap<FieldSchema, FieldSchema>(size());
+        Map<String, FieldSchema> fsCanonicalNameMap =
+            new HashMap<String, FieldSchema>(size());
         for (FieldSchema fs : mFields) {
             FieldSchema copy = fs.clone();
             s.mFields.add(copy);
             fsMap.put(fs, copy);
+            fsCanonicalNameMap.put(fs.canonicalName, copy);
         }
 
         // Build the aliases map
@@ -777,10 +780,10 @@
         }
 
         // Build the field schemas map
-        for (FieldSchema oldFs : mFieldSchemas.keySet()) {
-            FieldSchema newFs = fsMap.get(oldFs);
+        for (String oldFsCanonicalName : mFieldSchemas.keySet()) {
+            FieldSchema newFs = fsCanonicalNameMap.get(oldFsCanonicalName);
             assert(newFs != null);
-            s.mFieldSchemas.put(newFs, mFieldSchemas.get(oldFs));
+            s.mFieldSchemas.put(newFs.canonicalName, 
mFieldSchemas.get(oldFsCanonicalName));
         }
 
         return s;
@@ -919,7 +922,7 @@
         if(null != alias) {
             mAliases.put(alias, fs);
             if(null != fs) {
-                mFieldSchemas.put(fs, alias);
+                mFieldSchemas.put(fs.canonicalName, alias);
             }
         }
     }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/util/MultiMap.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/MultiMap.java?rev=701281&r1=701280&r2=701281&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/MultiMap.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/MultiMap.java Thu 
Oct  2 19:33:29 2008
@@ -160,6 +160,25 @@
         return values;
     }
 
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        Set<K> keys = mMap.keySet();
+        boolean hasNext = false;
+        sb.append("{");
+        for (K k : keys) {
+            if(hasNext) {
+                sb.append(",");
+            } else {
+                hasNext = true;
+            }
+            sb.append(k.toString() + "=");
+            sb.append(mMap.get(k));
+        }
+        sb.append("}");
+        return sb.toString();
+    }
+    
     /**
      * Get the number of entries in the map.
      * @return number of entries.

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=701281&r1=701280&r2=701281&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java 
Thu Oct  2 19:33:29 2008
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.test;
 
+import static org.apache.pig.ExecType.MAPREDUCE;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -487,5 +489,40 @@
         assertEquals(5, numIdentity);
     }
     
+    @Test
+    public void testComplexData() throws IOException, ExecException {
+        // Create input file with ascii data
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"{(f1, f2),(f3, 
f4)}\t(1,2)\t[key1#value1,key2#value2]"});
+        
+        pigServer.registerQuery("a = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+                "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
+        pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, 
m#'key1', m#'key2';");
+        Iterator<Tuple> it = pigServer.openIterator("b");
+        Tuple t = it.next();
+        assertEquals(new Long(2), t.get(0));
+        assertEquals("1", t.get(1).toString());
+        assertEquals("2", t.get(2).toString());
+        assertEquals("value1", t.get(3).toString());
+        assertEquals("value2", t.get(4).toString());
+        
+        //test with BinStorage
+        pigServer.registerQuery("a = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+                "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
+        String output = "/pig/out/TestEvalPipeline-testComplexData";
+        pigServer.deleteFile(output);
+        pigServer.store("a", output, BinStorage.class.getName());
+        pigServer.registerQuery("x = load '" + output +"' using BinStorage() " 
+
+                "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
+        pigServer.registerQuery("y = foreach x generate COUNT(b), t2.a, t2.b, 
m#'key1', m#'key2';");
+        it = pigServer.openIterator("y");
+        t = it.next();
+        assertEquals(new Long(2), t.get(0));
+        assertEquals("1", t.get(1).toString());
+        assertEquals("2", t.get(2).toString());
+        assertEquals("value1", t.get(3).toString());
+        assertEquals("value2", t.get(4).toString());
+        
+    }
 
 }


Reply via email to