Author: pisong Date: Wed Jul 23 03:42:58 2008 New Revision: 679062 URL: http://svn.apache.org/viewvc?rev=679062&view=rev Log: PIG-306 Fixed schema of group by multiple fields
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=679062&r1=679061&r2=679062&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Wed Jul 23 03:42:58 2008 @@ -30,6 +30,7 @@ import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.impl.util.MultiMap; @@ -107,80 +108,81 @@ // one more to account for the "group" // the alias of the first field is group and hence the // string "group" - // TODO The type of the field named "group" requires - // type promotion and the like /* * Here goes an attempt to describe how the schema for the first - * column - 'group' should look like If the number of group by + * column - 'group' should look like. If the number of group by * columns = 1 then the schema for 'group' is the * schema(fieldschema(col)) If the number of group by columns > 1 * then find the set union of the group by columns and form the * schema as schema(list<fieldschema of the cols>) * The parser will ensure that the number of group by columns are * the same across all inputs. The computation of the schema for group - * is + * is as follows: + * For each input of cogroup, for each operator (projection ,udf, constant), etc. + * compute the multimaps <group_column_number, alias> and <group_column_number, operator> + * and <alias, expression_operator> + * Also set the lookup table for each alias to false */ Schema groupBySchema = null; List<Schema.FieldSchema> groupByFss = new ArrayList<Schema.FieldSchema>(); - Set<String> groupByAliases = new HashSet<String>(); - Map<String, Boolean> lookup = new HashMap<String, Boolean>(); + Map<String, Boolean> aliasLookup = new HashMap<String, Boolean>(); MultiMap<String, ExpressionOperator> aliasExop = new MultiMap<String, ExpressionOperator>(); - MultiMap<Integer, String> positionAlias= new MultiMap<Integer, String>(); + MultiMap<Integer, String> positionAlias = new MultiMap<Integer, String>(); + MultiMap<Integer, ExpressionOperator> positionOperators = new MultiMap<Integer, ExpressionOperator>(); for (LogicalOperator op : inputs) { - log.debug("GBY Input: " + op.getClass().getName()); + int position = 0; for(LogicalPlan plan: mGroupByPlans.get(op)) { - int position = 0; for(LogicalOperator eOp: plan.getLeaves()) { - log.debug("Leaf: " + eOp); Schema.FieldSchema fs = ((ExpressionOperator)eOp).getFieldSchema(); - if(null != fs) { - Schema eOpSchema = fs.schema; - log.debug("Computing the lookup tables"); - if (null != fs) { - String alias = fs.alias; - //for (String alias : eOpSchema.getAliases()) { - if(null != alias) { - log.debug("Adding alias to GBY: " + alias); - groupByAliases.add(alias); - lookup.put(alias, false); - aliasExop.put(alias, (ExpressionOperator)eOp); - positionAlias.put(position, alias); - } + if (null != fs) { + String alias = fs.alias; + if(null != alias) { + aliasLookup.put(alias, false); + aliasExop.put(alias, (ExpressionOperator)eOp); + positionAlias.put(position, alias); } + //store the operators for each position in the group + } else { + log.warn("Field Schema of an expression operator cannot be null"); } + positionOperators.put(position, (ExpressionOperator)eOp); } ++position; } } - log.debug("Computed the lookup table"); - + /* + * Now that the multi maps and the look up table are computed, do the following: + * for each column in the group, in order check if the alias is alaready used or not + * If the alias is already used, check for the next unused alias. + * IF none of the aliases can be used then the alias of that column is null + * If an alias is found usable, then use that alias and the schema of the expression operator + * corresponding to that position. Note that the first operator for that position is + * picked. The type checker will ensure that the correct schema is merged + */ int arity = mGroupByPlans.get(inputs.get(0)).size(); - log.debug("Arity: " + arity); for (int i = 0; i < arity; ++i) { - Collection<String> cAliases; - cAliases = positionAlias.get(i); + Collection<String> cAliases = positionAlias.get(i); if(null != cAliases) { Object[] aliases = cAliases.toArray(); for(int j = 0; j < aliases.length; ++j) { String alias = (String) aliases[j]; if(null != alias) { - Collection<ExpressionOperator> cEops = aliasExop.get(alias); + //Collection<ExpressionOperator> cEops = aliasExop.get(alias); + Collection<ExpressionOperator> cEops = positionOperators.get(i); if(null != cEops) { ExpressionOperator eOp = (ExpressionOperator) (cEops.toArray())[0]; if(null != eOp) { - if(!lookup.get(alias)) { + if(!aliasLookup.get(alias)) { Schema.FieldSchema fs = eOp.getFieldSchema(); if(null != fs) { - log.debug("Added fs with alias " + alias + " and fs.schema " + fs.schema); - groupByFss.add(new Schema.FieldSchema(alias, fs.schema)); - lookup.put(alias, true); + groupByFss.add(new Schema.FieldSchema(alias, fs.schema, fs.type)); + aliasLookup.put(alias, true); } else { - log.debug("Added fs with alias " + alias + " and schema null"); - groupByFss.add(new Schema.FieldSchema(alias, null)); + groupByFss.add(new Schema.FieldSchema(alias, null, DataType.BYTEARRAY)); } } else { if(j < aliases.length) { @@ -190,11 +192,9 @@ //just add the schema of the expression operator with the null alias Schema.FieldSchema fs = eOp.getFieldSchema(); if(null != fs) { - log.debug("Added fs with alias null and schema " + fs.schema); - groupByFss.add(new Schema.FieldSchema(null, fs.schema)); + groupByFss.add(new Schema.FieldSchema(null, fs.schema, fs.type)); } else { - log.debug("Added fs with alias null and schema null"); - groupByFss.add(new Schema.FieldSchema(null, null)); + groupByFss.add(new Schema.FieldSchema(null, null, DataType.BYTEARRAY)); } break; } @@ -205,7 +205,7 @@ } } else { //should not be here - log.debug("Cannot be here: we cannot have an alias without an expression operator"); + log.debug("Cannot be here: we should have an expression operator at each position"); } } else { //should not be here @@ -215,39 +215,27 @@ } else { //We do not have any alias for this position in the group by columns //We have positions $1, $2, etc. - //The schema for these columns is the schema of the expression operatore - //and so the alias is null - log.debug("Added fs with alias null and type bytearray"); - groupByFss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY)); + groupByFss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY)); } + //The schema for these columns is the merged schema of the expression operatore + //This part is handled in the type checker } groupBySchema = new Schema(groupByFss); - log.debug("Printing group by schema aliases"); - groupBySchema.printAliases(); if(1 == arity) { - log.debug("Arity == 1"); byte groupByType = groupByFss.get(0).type; Schema groupSchema = groupByFss.get(0).schema; - log.debug("Type == " + DataType.findTypeName(groupByType)); fss.add(new Schema.FieldSchema("group", groupSchema, groupByType)); } else { fss.add(new Schema.FieldSchema("group", groupBySchema)); } for (LogicalOperator op : inputs) { - log.debug("Op: " + op.getClass().getName()); - log.debug("Op Alias: " + op.getAlias()); try { Schema cSchema = op.getSchema(); - if (null != cSchema) { - log.debug("Printing constituent schema aliases"); - cSchema.printAliases(); - } fss.add(new Schema.FieldSchema(op.getAlias(), op .getSchema(), DataType.BAG)); } catch (FrontendException ioe) { - log.debug("Caught an exception: " + ioe.getMessage()); mIsSchemaComputed = false; mSchema = null; throw ioe; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java?rev=679062&r1=679061&r2=679062&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java Wed Jul 23 03:42:58 2008 @@ -70,6 +70,7 @@ mMapKey = mapKey; mValueType = valueType; mValueSchema = valueSchema; + mType = mValueType; } public ExpressionOperator getMap() { @@ -100,14 +101,12 @@ } @Override - public Schema.FieldSchema getFieldSchema() { + public Schema.FieldSchema getFieldSchema() throws FrontendException { if (!mIsFieldSchemaComputed) { - Schema.FieldSchema fss; if (DataType.isSchemaType(mValueType)) { - fss = new Schema.FieldSchema(null, mValueSchema); + mFieldSchema = new Schema.FieldSchema(null, mValueSchema, mValueType); } else { - fss = new Schema.FieldSchema(null, DataType - .findType(mValueType)); + mFieldSchema = new Schema.FieldSchema(null, mValueType); } mIsFieldSchemaComputed = true; Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=679062&r1=679061&r2=679062&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Wed Jul 23 03:42:58 2008 @@ -49,12 +49,13 @@ import org.apache.pig.impl.logicalLayer.LOCogroup; import org.apache.pig.impl.logicalLayer.LOLoad; //import org.apache.pig.impl.logicalLayer.LOEval; -import org.apache.pig.impl.logicalLayer.LogicalOperator; +import org.apache.pig.impl.logicalLayer.*; import org.apache.pig.impl.logicalLayer.ExpressionOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder; import org.apache.pig.impl.logicalLayer.LOPrinter; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.data.DataType; public class TestLogicalPlanBuilder extends junit.framework.TestCase { @@ -983,6 +984,70 @@ buildPlan(query); } + @Test + public void testQuery85() throws FrontendException { + LogicalPlan lp; + buildPlan("a = load 'myfile' as (name, age, gpa);"); + lp = buildPlan("b = group a by (name, age);"); + LOCogroup cogroup = (LOCogroup) lp.getLeaves().get(0); + + Schema.FieldSchema nameFs = new Schema.FieldSchema("name", DataType.BYTEARRAY); + Schema.FieldSchema ageFs = new Schema.FieldSchema("age", DataType.BYTEARRAY); + Schema.FieldSchema gpaFs = new Schema.FieldSchema("gpa", DataType.BYTEARRAY); + + Schema groupSchema = new Schema(nameFs); + groupSchema.add(ageFs); + Schema.FieldSchema groupFs = new Schema.FieldSchema("group", groupSchema, DataType.TUPLE); + + Schema loadSchema = new Schema(nameFs); + loadSchema.add(ageFs); + loadSchema.add(gpaFs); + + Schema.FieldSchema bagFs = new Schema.FieldSchema("a", loadSchema, DataType.BAG); + + Schema cogroupExpectedSchema = new Schema(groupFs); + cogroupExpectedSchema.add(bagFs); + + assertTrue(cogroup.getSchema().equals(cogroupExpectedSchema)); + + lp = buildPlan("c = foreach b generate group.name, group.age, COUNT(a.gpa);"); + LOForEach foreach = (LOForEach) lp.getLeaves().get(0); + + Schema foreachExpectedSchema = new Schema(nameFs); + foreachExpectedSchema.add(ageFs); + foreachExpectedSchema.add(new Schema.FieldSchema(null, DataType.LONG)); + + assertTrue(foreach.getSchema().equals(foreachExpectedSchema)); + } + + @Test + public void testQuery86() throws FrontendException { + LogicalPlan lp; + buildPlan("a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);"); + lp = buildPlan("b = group a by (name, age);"); + LOCogroup cogroup = (LOCogroup) lp.getLeaves().get(0); + + Schema.FieldSchema nameFs = new Schema.FieldSchema("name", DataType.CHARARRAY); + Schema.FieldSchema ageFs = new Schema.FieldSchema("age", DataType.INTEGER); + Schema.FieldSchema gpaFs = new Schema.FieldSchema("gpa", DataType.FLOAT); + + Schema groupSchema = new Schema(nameFs); + groupSchema.add(ageFs); + Schema.FieldSchema groupFs = new Schema.FieldSchema("group", groupSchema, DataType.TUPLE); + + Schema loadSchema = new Schema(nameFs); + loadSchema.add(ageFs); + loadSchema.add(gpaFs); + + Schema.FieldSchema bagFs = new Schema.FieldSchema("a", loadSchema, DataType.BAG); + + Schema cogroupExpectedSchema = new Schema(groupFs); + cogroupExpectedSchema.add(bagFs); + + assertTrue(cogroup.getSchema().equals(cogroupExpectedSchema)); + + } + private void printPlan(LogicalPlan lp) { LOPrinter graphPrinter = new LOPrinter(System.err, lp); System.err.println("Printing the logical plan");