Author: pisong
Date: Wed Jul 23 03:42:58 2008
New Revision: 679062

URL: http://svn.apache.org/viewvc?rev=679062&view=rev
Log:
PIG-306 Fixed schema of group by multiple fields

Modified:
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java
    
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=679062&r1=679061&r2=679062&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
 Wed Jul 23 03:42:58 2008
@@ -30,6 +30,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.impl.util.MultiMap;
@@ -107,80 +108,81 @@
             // one more to account for the "group"
             // the alias of the first field is group and hence the
             // string "group"
-            // TODO The type of the field named "group" requires
-            // type promotion and the like
 
             /*
              * Here goes an attempt to describe how the schema for the first
-             * column - 'group' should look like If the number of group by
+             * column - 'group' should look like. If the number of group by
              * columns = 1 then the schema for 'group' is the
              * schema(fieldschema(col)) If the number of group by columns > 1
              * then find the set union of the group by columns and form the
              * schema as schema(list<fieldschema of the cols>)
              * The parser will ensure that the number of group by columns are
              * the same across all inputs. The computation of the schema for 
group
-             * is 
+             * is as follows:
+             * For each input of cogroup, for each operator (projection ,udf, 
constant), etc.
+             * compute the multimaps <group_column_number, alias> and 
<group_column_number, operator>
+             * and <alias, expression_operator>
+             * Also set the lookup table for each alias to false
              */
 
             Schema groupBySchema = null;
             List<Schema.FieldSchema> groupByFss = new 
ArrayList<Schema.FieldSchema>();
-            Set<String> groupByAliases = new HashSet<String>();
-            Map<String, Boolean> lookup = new HashMap<String, Boolean>();
+            Map<String, Boolean> aliasLookup = new HashMap<String, Boolean>();
             MultiMap<String, ExpressionOperator> aliasExop = new 
MultiMap<String, ExpressionOperator>();
-            MultiMap<Integer, String> positionAlias= new MultiMap<Integer, 
String>();
+            MultiMap<Integer, String> positionAlias = new MultiMap<Integer, 
String>();
+            MultiMap<Integer, ExpressionOperator> positionOperators = new 
MultiMap<Integer, ExpressionOperator>();
             
             for (LogicalOperator op : inputs) {
-                log.debug("GBY Input: " + op.getClass().getName());
+                int position = 0;
                 for(LogicalPlan plan: mGroupByPlans.get(op)) {
-                    int position = 0;
                     for(LogicalOperator eOp: plan.getLeaves()) {
-                        log.debug("Leaf: " + eOp);
                         Schema.FieldSchema fs = 
((ExpressionOperator)eOp).getFieldSchema();
-                        if(null != fs) {
-                            Schema eOpSchema = fs.schema;
-                            log.debug("Computing the lookup tables");
-                            if (null != fs) {
-                                String alias = fs.alias;
-                                //for (String alias : eOpSchema.getAliases()) {
-                                if(null != alias) {
-                                    log.debug("Adding alias to GBY: " + alias);
-                                    groupByAliases.add(alias);
-                                    lookup.put(alias, false);
-                                    aliasExop.put(alias, 
(ExpressionOperator)eOp);                            
-                                    positionAlias.put(position, alias);
-                                }
+                        if (null != fs) {
+                            String alias = fs.alias;
+                            if(null != alias) {
+                                aliasLookup.put(alias, false);
+                                aliasExop.put(alias, (ExpressionOperator)eOp); 
                           
+                                positionAlias.put(position, alias);
                             }
+                            //store the operators for each position in the 
group
+                        } else {
+                            log.warn("Field Schema of an expression operator 
cannot be null"); 
                         }
+                        positionOperators.put(position, 
(ExpressionOperator)eOp);
                     }
                     ++position;
                 }
             }
             
-            log.debug("Computed the lookup table");
-
+            /*
+             * Now that the multi maps and the look up table are computed, do 
the following:
+             * for each column in the group, in order check if the alias is 
alaready used or not
+             * If the alias is already used, check for the next unused alias.
+             * IF none of the aliases can be used then the alias of that 
column is null
+             * If an alias is found usable, then use that alias and the schema 
of the expression operator
+             * corresponding to that position. Note that the first operator 
for that position is
+             * picked. The type checker will ensure that the correct schema is 
merged
+             */
             int arity = mGroupByPlans.get(inputs.get(0)).size();
-            log.debug("Arity: " + arity);
             for (int i = 0; i < arity; ++i) {
-                Collection<String> cAliases;
-                cAliases = positionAlias.get(i);
+                Collection<String> cAliases = positionAlias.get(i);
                 if(null != cAliases) {
                     Object[] aliases = cAliases.toArray();
                     for(int j = 0; j < aliases.length; ++j) {
                         String alias = (String) aliases[j];
                         if(null != alias) {
-                            Collection<ExpressionOperator> cEops = 
aliasExop.get(alias);
+                            //Collection<ExpressionOperator> cEops = 
aliasExop.get(alias);
+                            Collection<ExpressionOperator> cEops = 
positionOperators.get(i);
                             if(null != cEops) {
                                 ExpressionOperator eOp = (ExpressionOperator) 
(cEops.toArray())[0];
                                 if(null != eOp) {
-                                    if(!lookup.get(alias)) {
+                                    if(!aliasLookup.get(alias)) {
                                         Schema.FieldSchema fs = 
eOp.getFieldSchema();
                                         if(null != fs) {
-                                            log.debug("Added fs with alias " + 
alias + " and fs.schema " + fs.schema);
-                                            groupByFss.add(new 
Schema.FieldSchema(alias, fs.schema));
-                                            lookup.put(alias, true);
+                                            groupByFss.add(new 
Schema.FieldSchema(alias, fs.schema, fs.type));
+                                            aliasLookup.put(alias, true);
                                         } else {
-                                            log.debug("Added fs with alias " + 
alias + " and schema null");
-                                            groupByFss.add(new 
Schema.FieldSchema(alias, null));
+                                            groupByFss.add(new 
Schema.FieldSchema(alias, null, DataType.BYTEARRAY));
                                         }
                                     } else {
                                         if(j < aliases.length) {
@@ -190,11 +192,9 @@
                                             //just add the schema of the 
expression operator with the null alias
                                             Schema.FieldSchema fs = 
eOp.getFieldSchema();
                                             if(null != fs) {
-                                                log.debug("Added fs with alias 
null and schema " + fs.schema);
-                                                groupByFss.add(new 
Schema.FieldSchema(null, fs.schema));
+                                                groupByFss.add(new 
Schema.FieldSchema(null, fs.schema, fs.type));
                                             } else {
-                                                log.debug("Added fs with alias 
null and schema null");
-                                                groupByFss.add(new 
Schema.FieldSchema(null, null));
+                                                groupByFss.add(new 
Schema.FieldSchema(null, null, DataType.BYTEARRAY));
                                             }
                                             break;
                                         }
@@ -205,7 +205,7 @@
                                 }
                             } else {
                                 //should not be here
-                                log.debug("Cannot be here: we cannot have an 
alias without an expression operator");
+                                log.debug("Cannot be here: we should have an 
expression operator at each position");
                             }
                         } else {
                             //should not be here
@@ -215,39 +215,27 @@
                 } else {
                     //We do not have any alias for this position in the group 
by columns
                     //We have positions $1, $2, etc.
-                    //The schema for these columns is the schema of the 
expression operatore
-                    //and so the alias is null
-                    log.debug("Added fs with alias null and type bytearray");
-                    groupByFss.add(new Schema.FieldSchema(null, 
DataType.BYTEARRAY));                    
+                    groupByFss.add(new Schema.FieldSchema(null, 
DataType.BYTEARRAY));
                 }
+                //The schema for these columns is the merged schema of the 
expression operatore
+                //This part is handled in the type checker
             }            
 
             groupBySchema = new Schema(groupByFss);
-            log.debug("Printing group by schema aliases");
-            groupBySchema.printAliases();
 
             if(1 == arity) {
-                log.debug("Arity == 1");
                 byte groupByType = groupByFss.get(0).type;
                 Schema groupSchema = groupByFss.get(0).schema;
-                log.debug("Type == " + DataType.findTypeName(groupByType));
                 fss.add(new Schema.FieldSchema("group", groupSchema, 
groupByType));
             } else {
                 fss.add(new Schema.FieldSchema("group", groupBySchema));
             }
             for (LogicalOperator op : inputs) {
-                log.debug("Op: " + op.getClass().getName());
-                log.debug("Op Alias: " + op.getAlias());
                 try {
                     Schema cSchema = op.getSchema();
-                    if (null != cSchema) {
-                        log.debug("Printing constituent schema aliases");
-                        cSchema.printAliases();
-                    }
                     fss.add(new Schema.FieldSchema(op.getAlias(), op
                             .getSchema(), DataType.BAG));
                 } catch (FrontendException ioe) {
-                    log.debug("Caught an exception: " + ioe.getMessage());
                     mIsSchemaComputed = false;
                     mSchema = null;
                     throw ioe;

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java?rev=679062&r1=679061&r2=679062&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java
 Wed Jul 23 03:42:58 2008
@@ -70,6 +70,7 @@
         mMapKey = mapKey;
         mValueType = valueType;
         mValueSchema = valueSchema;
+        mType = mValueType;
     }
 
     public ExpressionOperator getMap() {
@@ -100,14 +101,12 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if (!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fss;
             if (DataType.isSchemaType(mValueType)) {
-                fss = new Schema.FieldSchema(null, mValueSchema);
+                mFieldSchema = new Schema.FieldSchema(null, mValueSchema, 
mValueType);
             } else {
-                fss = new Schema.FieldSchema(null, DataType
-                        .findType(mValueType));
+                mFieldSchema = new Schema.FieldSchema(null, mValueType);
             }
 
             mIsFieldSchemaComputed = true;

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=679062&r1=679061&r2=679062&view=diff
==============================================================================
--- 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
 (original)
+++ 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
 Wed Jul 23 03:42:58 2008
@@ -49,12 +49,13 @@
 import org.apache.pig.impl.logicalLayer.LOCogroup;
 import org.apache.pig.impl.logicalLayer.LOLoad;
 //import org.apache.pig.impl.logicalLayer.LOEval;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.*;
 import org.apache.pig.impl.logicalLayer.ExpressionOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.logicalLayer.LOPrinter;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.data.DataType;
 
 
 public class TestLogicalPlanBuilder extends junit.framework.TestCase {
@@ -983,6 +984,70 @@
         buildPlan(query);
     }
     
+    @Test
+    public void testQuery85() throws FrontendException {
+        LogicalPlan lp;
+        buildPlan("a = load 'myfile' as (name, age, gpa);");
+        lp = buildPlan("b = group a by (name, age);");
+        LOCogroup cogroup = (LOCogroup) lp.getLeaves().get(0);
+
+        Schema.FieldSchema nameFs = new Schema.FieldSchema("name", 
DataType.BYTEARRAY);
+        Schema.FieldSchema ageFs = new Schema.FieldSchema("age", 
DataType.BYTEARRAY);
+        Schema.FieldSchema gpaFs = new Schema.FieldSchema("gpa", 
DataType.BYTEARRAY);
+        
+        Schema groupSchema = new Schema(nameFs);
+        groupSchema.add(ageFs);
+        Schema.FieldSchema groupFs = new Schema.FieldSchema("group", 
groupSchema, DataType.TUPLE);
+        
+        Schema loadSchema = new Schema(nameFs);
+        loadSchema.add(ageFs);
+        loadSchema.add(gpaFs);
+
+        Schema.FieldSchema bagFs = new Schema.FieldSchema("a", loadSchema, 
DataType.BAG);
+        
+        Schema cogroupExpectedSchema = new Schema(groupFs);
+        cogroupExpectedSchema.add(bagFs);
+
+        assertTrue(cogroup.getSchema().equals(cogroupExpectedSchema));
+
+        lp = buildPlan("c = foreach b generate group.name, group.age, 
COUNT(a.gpa);");
+        LOForEach foreach  = (LOForEach) lp.getLeaves().get(0);
+
+        Schema foreachExpectedSchema = new Schema(nameFs);
+        foreachExpectedSchema.add(ageFs);
+        foreachExpectedSchema.add(new Schema.FieldSchema(null, DataType.LONG));
+
+        assertTrue(foreach.getSchema().equals(foreachExpectedSchema));
+    }
+
+    @Test
+    public void testQuery86() throws FrontendException {
+        LogicalPlan lp;
+        buildPlan("a = load 'myfile' as (name:Chararray, age:Int, 
gpa:Float);");
+        lp = buildPlan("b = group a by (name, age);");
+        LOCogroup cogroup = (LOCogroup) lp.getLeaves().get(0);
+
+        Schema.FieldSchema nameFs = new Schema.FieldSchema("name", 
DataType.CHARARRAY);
+        Schema.FieldSchema ageFs = new Schema.FieldSchema("age", 
DataType.INTEGER);
+        Schema.FieldSchema gpaFs = new Schema.FieldSchema("gpa", 
DataType.FLOAT);
+
+        Schema groupSchema = new Schema(nameFs);
+        groupSchema.add(ageFs);
+        Schema.FieldSchema groupFs = new Schema.FieldSchema("group", 
groupSchema, DataType.TUPLE);
+
+        Schema loadSchema = new Schema(nameFs);
+        loadSchema.add(ageFs);
+        loadSchema.add(gpaFs);
+
+        Schema.FieldSchema bagFs = new Schema.FieldSchema("a", loadSchema, 
DataType.BAG);
+
+        Schema cogroupExpectedSchema = new Schema(groupFs);
+        cogroupExpectedSchema.add(bagFs);
+
+        assertTrue(cogroup.getSchema().equals(cogroupExpectedSchema));
+
+    }
+
     private void printPlan(LogicalPlan lp) {
         LOPrinter graphPrinter = new LOPrinter(System.err, lp);
         System.err.println("Printing the logical plan");


Reply via email to