Author: olga Date: Thu Oct 2 19:33:29 2008 New Revision: 701281 URL: http://svn.apache.org/viewvc?rev=701281&view=rev Log: PIG-464: bag schema definition
Modified: incubator/pig/branches/types/CHANGES.txt incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java incubator/pig/branches/types/src/org/apache/pig/impl/util/MultiMap.java incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Modified: incubator/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=701281&r1=701280&r2=701281&view=diff ============================================================================== --- incubator/pig/branches/types/CHANGES.txt (original) +++ incubator/pig/branches/types/CHANGES.txt Thu Oct 2 19:33:29 2008 @@ -269,3 +269,5 @@ PIG-470: TextLoader should produce bytearrays (sms via olgan) PIG-335: lineage (sms vi olgan) + + PIG-464: bag schema definition (pradeepk via olgan) Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=701281&r1=701280&r2=701281&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Thu Oct 2 19:33:29 2008 @@ -405,7 +405,7 @@ try { FieldSchema fs = new FieldSchema(alias, (schema == null ? null : schema.clone()), type); - fs.canonicalName = canonicalName; + fs.canonicalName = canonicalNamer.getNewName(); if (canonicalMap != null) { fs.canonicalMap = new HashMap<String, LogicalOperator>(canonicalMap); @@ -498,13 +498,13 @@ private List<FieldSchema> mFields; private Map<String, FieldSchema> mAliases; - private MultiMap<FieldSchema, String> mFieldSchemas; + private MultiMap<String, String> mFieldSchemas; private static Log log = LogFactory.getLog(Schema.class); public Schema() { mFields = new ArrayList<FieldSchema>(); mAliases = new HashMap<String, FieldSchema>(); - mFieldSchemas = new MultiMap<FieldSchema, String>(); + mFieldSchemas = new MultiMap<String, String>(); } /** @@ -513,12 +513,12 @@ public Schema(List<FieldSchema> fields) { mFields = fields; mAliases = new HashMap<String, FieldSchema>(fields.size()); - mFieldSchemas = new MultiMap<FieldSchema, String>(); + mFieldSchemas = new MultiMap<String, String>(); for (FieldSchema fs : fields) { if (fs.alias != null) { mAliases.put(fs.alias, fs); if(null != fs) { - mFieldSchemas.put(fs, fs.alias); + mFieldSchemas.put(fs.canonicalName, fs.alias); } } } @@ -532,11 +532,11 @@ mFields = new ArrayList<FieldSchema>(1); mFields.add(fieldSchema); mAliases = new HashMap<String, FieldSchema>(1); - mFieldSchemas = new MultiMap<FieldSchema, String>(); + mFieldSchemas = new MultiMap<String, String>(); if (fieldSchema.alias != null) { mAliases.put(fieldSchema.alias, fieldSchema); if(null != fieldSchema) { - mFieldSchemas.put(fieldSchema, fieldSchema.alias); + mFieldSchemas.put(fieldSchema.canonicalName, fieldSchema.alias); } } } @@ -550,7 +550,7 @@ if(null != s) { mFields = new ArrayList<FieldSchema>(s.size()); mAliases = new HashMap<String, FieldSchema>(); - mFieldSchemas = new MultiMap<FieldSchema, String>(); + mFieldSchemas = new MultiMap<String, String>(); try { for (int i = 0; i < s.size(); ++i) { FieldSchema fs = new FieldSchema(s.getField(i)); @@ -559,7 +559,7 @@ if (fs.alias != null) { mAliases.put(fs.alias, fs); if(null != fs) { - mFieldSchemas.put(fs, fs.alias); + mFieldSchemas.put(fs.canonicalName, fs.alias); } } } @@ -567,12 +567,12 @@ } catch (ParseException pe) { mFields = new ArrayList<FieldSchema>(); mAliases = new HashMap<String, FieldSchema>(); - mFieldSchemas = new MultiMap<FieldSchema, String>(); + mFieldSchemas = new MultiMap<String, String>(); } } else { mFields = new ArrayList<FieldSchema>(); mAliases = new HashMap<String, FieldSchema>(); - mFieldSchemas = new MultiMap<FieldSchema, String>(); + mFieldSchemas = new MultiMap<String, String>(); } } @@ -699,7 +699,7 @@ if (ourFs.alias != null) { log.debug("Removing ourFs.alias: " + ourFs.alias); mAliases.remove(ourFs.alias); - Collection<String> aliases = mFieldSchemas.get(ourFs); + Collection<String> aliases = mFieldSchemas.get(ourFs.canonicalName); if (aliases != null) { List<String> listAliases = new ArrayList<String>(); for(String alias: aliases) { @@ -707,7 +707,7 @@ } for(String alias: listAliases) { log.debug("Removing alias " + alias + " from multimap"); - mFieldSchemas.remove(ourFs, alias); + mFieldSchemas.remove(ourFs.canonicalName, alias); } } } @@ -715,7 +715,7 @@ log.debug("Setting alias to: " + otherFs.alias); mAliases.put(ourFs.alias, ourFs); if(null != ourFs.alias) { - mFieldSchemas.put(ourFs, ourFs.alias); + mFieldSchemas.put(ourFs.canonicalName, ourFs.alias); } } if (otherFs.type != DataType.UNKNOWN) { @@ -761,10 +761,13 @@ // list with copies of the existing field schemas. Map<FieldSchema, FieldSchema> fsMap = new HashMap<FieldSchema, FieldSchema>(size()); + Map<String, FieldSchema> fsCanonicalNameMap = + new HashMap<String, FieldSchema>(size()); for (FieldSchema fs : mFields) { FieldSchema copy = fs.clone(); s.mFields.add(copy); fsMap.put(fs, copy); + fsCanonicalNameMap.put(fs.canonicalName, copy); } // Build the aliases map @@ -777,10 +780,10 @@ } // Build the field schemas map - for (FieldSchema oldFs : mFieldSchemas.keySet()) { - FieldSchema newFs = fsMap.get(oldFs); + for (String oldFsCanonicalName : mFieldSchemas.keySet()) { + FieldSchema newFs = fsCanonicalNameMap.get(oldFsCanonicalName); assert(newFs != null); - s.mFieldSchemas.put(newFs, mFieldSchemas.get(oldFs)); + s.mFieldSchemas.put(newFs.canonicalName, mFieldSchemas.get(oldFsCanonicalName)); } return s; @@ -919,7 +922,7 @@ if(null != alias) { mAliases.put(alias, fs); if(null != fs) { - mFieldSchemas.put(fs, alias); + mFieldSchemas.put(fs.canonicalName, alias); } } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/MultiMap.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/MultiMap.java?rev=701281&r1=701280&r2=701281&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/util/MultiMap.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/util/MultiMap.java Thu Oct 2 19:33:29 2008 @@ -160,6 +160,25 @@ return values; } + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + Set<K> keys = mMap.keySet(); + boolean hasNext = false; + sb.append("{"); + for (K k : keys) { + if(hasNext) { + sb.append(","); + } else { + hasNext = true; + } + sb.append(k.toString() + "="); + sb.append(mMap.get(k)); + } + sb.append("}"); + return sb.toString(); + } + /** * Get the number of entries in the map. * @return number of entries. Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=701281&r1=701280&r2=701281&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Thu Oct 2 19:33:29 2008 @@ -17,6 +17,8 @@ */ package org.apache.pig.test; +import static org.apache.pig.ExecType.MAPREDUCE; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -487,5 +489,40 @@ assertEquals(5, numIdentity); } + @Test + public void testComplexData() throws IOException, ExecException { + // Create input file with ascii data + File input = Util.createInputFile("tmp", "", + new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"}); + + pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " + + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);"); + pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';"); + Iterator<Tuple> it = pigServer.openIterator("b"); + Tuple t = it.next(); + assertEquals(new Long(2), t.get(0)); + assertEquals("1", t.get(1).toString()); + assertEquals("2", t.get(2).toString()); + assertEquals("value1", t.get(3).toString()); + assertEquals("value2", t.get(4).toString()); + + //test with BinStorage + pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " + + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);"); + String output = "/pig/out/TestEvalPipeline-testComplexData"; + pigServer.deleteFile(output); + pigServer.store("a", output, BinStorage.class.getName()); + pigServer.registerQuery("x = load '" + output +"' using BinStorage() " + + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);"); + pigServer.registerQuery("y = foreach x generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';"); + it = pigServer.openIterator("y"); + t = it.next(); + assertEquals(new Long(2), t.get(0)); + assertEquals("1", t.get(1).toString()); + assertEquals("2", t.get(2).toString()); + assertEquals("value1", t.get(3).toString()); + assertEquals("value2", t.get(4).toString()); + + } }