Author: thejas Date: Fri Sep 17 00:35:44 2010 New Revision: 997957 URL: http://svn.apache.org/viewvc?rev=997957&view=rev Log: PIG-1610: 'union onschema' does handle some cases involving 'namespaced' column names in schema
Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/schema/Schema.java hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=997957&r1=997956&r2=997957&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.8/CHANGES.txt Fri Sep 17 00:35:44 2010 @@ -192,6 +192,9 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1610: 'union onschema' does handle some cases involving 'namespaced' +column names in schema (thejas) + PIG-1609: 'union onschema' should give a more useful error message when schema of one of the relations has null column name(thejas) Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=997957&r1=997956&r2=997957&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Sep 17 00:35:44 2010 @@ -2433,6 +2433,15 @@ LogicalOperator UnionClause(LogicalPlan // null for columns that don't exist in the input ArrayList<LogicalOperator> newInputs = new ArrayList<LogicalOperator>(inputs.size()); + + //create a user defined schema list for use in LOForeach + // using merged schema + ArrayList<Schema> mergedSchemaList = new ArrayList<Schema>(); + for(Schema.FieldSchema fs : mergedSchema.getFields()){ + mergedSchemaList.add(new Schema(new Schema.FieldSchema(fs.alias, DataType.NULL))); + } + + for(LogicalOperator lop : inputs) { if(! lop.getSchema().equals(mergedSchema)) @@ -2450,7 +2459,7 @@ LogicalOperator UnionClause(LogicalPlan Schema inpSchema = lop.getSchema(); flattenList.add(Boolean.FALSE); - int inpPos = inpSchema.getPosition(fs.alias); + int inpPos = inpSchema.getPositionSubName(fs.alias); LogicalOperator columnProj = null; boolean isCastNeeded = false; @@ -2478,7 +2487,7 @@ LogicalOperator UnionClause(LogicalPlan //cast is needed if types are different. //compatibility of types has already been checked //during creation of mergedSchema - Schema.FieldSchema inpFs = inpSchema.getField(fs.alias); + Schema.FieldSchema inpFs = inpSchema.getFieldSubNameMatch(fs.alias); if(inpFs.type != fs.type) isCastNeeded = true; } @@ -2499,7 +2508,8 @@ LogicalOperator UnionClause(LogicalPlan } LogicalOperator foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), - generatePlans, flattenList + generatePlans, flattenList, + mergedSchemaList ); lp.add(foreach); lp.connect(lop, foreach); @@ -2514,7 +2524,9 @@ LogicalOperator UnionClause(LogicalPlan // use newInputs as the inputs for union inputs = newInputs; } + + for (LogicalOperator lop: inputs) { lp.connect(lop, union); log.debug("Connected union input operator " + Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/schema/Schema.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=997957&r1=997956&r2=997957&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Fri Sep 17 00:35:44 2010 @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.Collection; +import java.util.Map.Entry; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; @@ -822,6 +823,54 @@ public class Schema implements Serializa } } + + /** + * Given an alias name, find the associated FieldSchema. If exact name is + * not found see if any field matches the part of the 'namespaced' alias. + * eg. if given alias is nm::a , and schema is (a,b). It will return + * FieldSchema of a. + * if given alias is nm::a and schema is (nm2::a, b), it will return null + * @param alias Alias to look up. + * @return FieldSchema, or null if no such alias is in this tuple. + */ + public FieldSchema getFieldSubNameMatch(String alias) throws FrontendException { + if(alias == null) + return null; + FieldSchema fs = getField(alias); + if(fs != null){ + return fs; + } + //fs is null + final String sep = "::"; + ArrayList<FieldSchema> matchedFieldSchemas = new ArrayList<FieldSchema>(); + if(alias.contains(sep)){ + for(FieldSchema field : mFields) { + if(alias.endsWith(sep + field.alias)){ + matchedFieldSchemas.add(field); + } + } + } + if(matchedFieldSchemas.size() > 1){ + boolean hasNext = false; + StringBuilder sb = new StringBuilder("Found more than one " + + "sub alias name match: "); + for (FieldSchema matchFs : matchedFieldSchemas) { + if(hasNext) { + sb.append(", "); + } else { + hasNext = true; + } + sb.append(matchFs.alias); + } + int errCode = 1116; + throw new FrontendException(sb.toString(), errCode, PigException.INPUT); + }else if(matchedFieldSchemas.size() == 1){ + fs = matchedFieldSchemas.get(0); + } + + return fs; + } + /** * Given a field number, find the associated FieldSchema. * @@ -1096,6 +1145,31 @@ public class Schema implements Serializa * @return position of the FieldSchema. */ public int getPosition(String alias) throws FrontendException{ + return getPosition(alias, false); + } + + + /** + * Given an alias, find the associated position of the field schema. + * It uses getFieldSubNameMatch to look for subName matches as well. + * @param alias + * alias of the FieldSchema. + * @return position of the FieldSchema. + */ + public int getPositionSubName(String alias) throws FrontendException{ + return getPosition(alias, true); + } + + + private int getPosition(String alias, boolean isSubNameMatch) + throws FrontendException { + if(isSubNameMatch && twoLevelAccessRequired){ + // should not happen + int errCode = 2248; + String msg = "twoLevelAccessRequired==true is not supported with" + + "and isSubNameMatch==true "; + throw new FrontendException(msg, errCode, PigException.BUG); + } if(twoLevelAccessRequired) { // this is the case where "this" schema is that of // a bag which has just one tuple fieldschema which @@ -1106,7 +1180,7 @@ public class Schema implements Serializa // which is that of a tuple if(mFields.size() != 1) { int errCode = 1008; - String msg = "Expected a bag schema with a single " + + String msg = "Expected a bag schema with a single " + "element of type "+ DataType.findTypeName(DataType.TUPLE) + " but got a bag schema with multiple elements."; throw new FrontendException(msg, errCode, PigException.INPUT); @@ -1114,10 +1188,10 @@ public class Schema implements Serializa Schema.FieldSchema tupleFS = mFields.get(0); if(tupleFS.type != DataType.TUPLE) { int errCode = 1009; - String msg = "Expected a bag schema with a single " + - "element of type "+ DataType.findTypeName(DataType.TUPLE) + - " but got an element of type " + - DataType.findTypeName(tupleFS.type); + String msg = "Expected a bag schema with a single " + + "element of type "+ DataType.findTypeName(DataType.TUPLE) + + " but got an element of type " + + DataType.findTypeName(tupleFS.type); throw new FrontendException(msg, errCode, PigException.INPUT); } @@ -1127,16 +1201,16 @@ public class Schema implements Serializa // in the tuple if(alias.equals(tupleFS.alias)) { int errCode = 1028; - String msg = "Access to the tuple ("+ alias + ") of " + - "the bag is disallowed. Only access to the elements of " + - "the tuple in the bag is allowed."; + String msg = "Access to the tuple ("+ alias + ") of " + + "the bag is disallowed. Only access to the elements of " + + "the tuple in the bag is allowed."; throw new FrontendException(msg, errCode, PigException.INPUT); } // all is good - get the position from the tuple's schema return tupleFS.schema.getPosition(alias); } else { - FieldSchema fs = getField(alias); + FieldSchema fs = isSubNameMatch ? getFieldSubNameMatch(alias) : getField(alias); if (null == fs) { return -1; @@ -1379,6 +1453,10 @@ public class Schema implements Serializa boolean allowDifferentSizeMerge, boolean allowIncompatibleTypes) throws SchemaMergeException { + if(schema == null && other == null){ + //if both are null, they are not incompatible + return null; + } if (schema == null) { if (allowIncompatibleTypes) { return null ; @@ -1454,20 +1532,13 @@ public class Schema implements Serializa } else { // merge inner tuple because both sides are tuples + //if inner schema are incompatible and allowIncompatibleTypes==true + // an exception is thrown by mergeSchema Schema mergedSubSchema = mergeSchema(myFs.schema, otherFs.schema, otherTakesAliasPrecedence, allowDifferentSizeMerge, allowIncompatibleTypes) ; - // if they cannot be merged and we don't allow incompatible - // types, just return null meaning cannot merge - if ( (mergedSubSchema == null) && - (!allowIncompatibleTypes) ) { - int errCode = 1032; - String msg = "Incompatible inner schemas for merging schemas. " - + " Field schema: " + myFs.schema + " Other field schema: " + otherFs.schema; - throw new SchemaMergeException(msg, errCode, PigException.INPUT) ; - } // create the merged field // the mergedSubSchema can be true if allowIncompatibleTypes @@ -1599,32 +1670,35 @@ public class Schema implements Serializa Schema schema2) throws SchemaMergeException{ Schema mergedSchema = new Schema(); + HashSet<FieldSchema> schema2colsAdded = new HashSet<FieldSchema>(); // add/merge fields present in first schema for(FieldSchema fs1 : schema1.getFields()){ checkNullAlias(fs1, schema1); - - FieldSchema fs2 = getFieldThrowSchemaMergeException(schema2,fs1.alias); + FieldSchema fs2 = getFieldSubNameMatchThrowSchemaMergeException(schema2,fs1.alias); + if(fs2 != null){ + schema2colsAdded.add(fs2); + } FieldSchema mergedFs = mergeFieldSchemaFirstLevelSameAlias(fs1,fs2); mergedSchema.add(mergedFs); } - + //add schemas from 2nd schema, that are not already present in // merged schema for(FieldSchema fs2 : schema2.getFields()){ checkNullAlias(fs2, schema2); - if(getFieldThrowSchemaMergeException(mergedSchema, fs2.alias) == null){ - try { - mergedSchema.add(fs2.clone()); - } catch (CloneNotSupportedException e) { - throw new SchemaMergeException( - "Error encountered while merging schemas", e); - } + if(! schema2colsAdded.contains(fs2)){ + try { + mergedSchema.add(fs2.clone()); + } catch (CloneNotSupportedException e) { + throw new SchemaMergeException( + "Error encountered while merging schemas", e); + } } } return mergedSchema; - + } - + private static void checkNullAlias(FieldSchema fs, Schema schema) throws SchemaMergeException { if(fs.alias == null){ @@ -1655,6 +1729,8 @@ public class Schema implements Serializa Schema innerSchema = null; + String alias = mergeNameSpacedAlias(fs1.alias, fs2.alias); + byte mergedType = DataType.mergeType(fs1.type, fs2.type) ; // If the types cannot be merged @@ -1684,7 +1760,7 @@ public class Schema implements Serializa } } try { - return new FieldSchema(fs1.alias, innerSchema, mergedType) ; + return new FieldSchema(alias, innerSchema, mergedType) ; } catch (FrontendException e) { // this exception is not expected int errCode = 2124; @@ -1698,6 +1774,29 @@ public class Schema implements Serializa /** + * If one of the aliases is of form 'nm::str1', and other is of the form + * 'str1', this returns str1 + * @param alias1 + * @param alias2 + * @return merged alias + * @throws SchemaMergeException + */ + private static String mergeNameSpacedAlias(String alias1, String alias2) + throws SchemaMergeException { + if(alias1.equals(alias2)){ + return alias1; + } + if(alias1.endsWith("::" + alias2)){ + return alias2; + } + if(alias2.endsWith("::" + alias1)){ + return alias1; + } + //the aliases are different, alias cannot be merged + return null; + } + + /** * Utility function that calls schema.getFiled(alias), and converts * {...@link FrontendException} to {...@link SchemaMergeException} * @param schema @@ -1705,13 +1804,13 @@ public class Schema implements Serializa * @return FieldSchema * @throws SchemaMergeException */ - private static FieldSchema getFieldThrowSchemaMergeException( + private static FieldSchema getFieldSubNameMatchThrowSchemaMergeException( Schema schema, String alias) throws SchemaMergeException { FieldSchema fs = null; try { - fs = schema.getField(alias); + fs = schema.getFieldSubNameMatch(alias); } catch (FrontendException e) { - String msg = "Caught exception finding FieldSchema for alias" + + String msg = "Caught exception finding FieldSchema for alias " + alias; throw new SchemaMergeException(msg, e); } Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java?rev=997957&r1=997956&r2=997957&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java (original) +++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java Fri Sep 17 00:35:44 2010 @@ -17,6 +17,7 @@ */ package org.apache.pig.test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.io.File; @@ -28,13 +29,17 @@ import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.pig.EvalFunc; import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.parser.ParseException; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -127,7 +132,148 @@ public class TestUnionOnSchema { } + /** + * Test UNION ONSCHEMA where a common column has additional 'namespace' part + * in the column name in one of the inputs + * @throws IOException + * @throws ParseException + */ + @Test + public void testUnionOnSchemaScopedColumnName() throws IOException, ParseException { + PigServer pig = new PigServer(ExecType.LOCAL); + String query_prefix = + " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " + + "g = group l1 by i; " + + "f = foreach g generate flatten(l1); " + + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); "; + + String query = query_prefix + "u = union onschema f, l2; " ; + Util.registerMultiLineQuery(pig, query); + Schema sch = pig.dumpSchema("u"); + Schema expectedSch = Util.getSchemaFromString("i: int, j: int"); + assertEquals("Checking expected schema",sch, expectedSch); + Iterator<Tuple> it = pig.openIterator("u"); + + List<Tuple> expectedRes = + Util.getTuplesFromConstantTupleStrings( + new String[] { + "(1,2)", + "(5,3)", + "(1,2)", + "(5,3)" + }); + Util.checkQueryOutputsAfterSort(it, expectedRes); + + // now try reversing the order of relation + query = query_prefix + "u = union onschema l2, f; " ; + Util.registerMultiLineQuery(pig, query); + sch = pig.dumpSchema("u"); + expectedSch = Util.getSchemaFromString("i: int, j: int"); + assertEquals("Checking expected schema",sch, expectedSch); + it = pig.openIterator("u"); + Util.checkQueryOutputsAfterSort(it, expectedRes); + + } + /** + * Test UNION ONSCHEMA where a common column has additional 'namespace' part + * in the column name in both the inputs + * @throws IOException + * @throws ParseException + */ + @Test + public void testUnionOnSchemaScopedColumnNameBothInp1() throws IOException, ParseException { + PigServer pig = new PigServer(ExecType.LOCAL); + String query = + " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " + + "g1 = group l1 by i; " + + "f1 = foreach g1 generate group as gkey, flatten(l1); " + + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, x : chararray); " + + "g2 = group l2 by i; " + + "f2 = foreach g2 generate group as gkey, flatten(l2); " + + "u = union onschema f1, f2; " ; + Util.registerMultiLineQuery(pig, query); + + Schema sch = pig.dumpSchema("u"); + Schema expectedSch = + Util.getSchemaFromString("gkey: int, l1::i: int, l1::j: int, l2::i: int, l2::x: chararray"); + assertEquals("Checking expected schema",sch, expectedSch); + + Iterator<Tuple> it = pig.openIterator("u"); + List<Tuple> expectedRes = + Util.getTuplesFromConstantTupleStrings( + new String[] { + "(1,1,2,null,null)", + "(5,5,3,null,null)", + "(1,null,null,1,'2')", + "(5,null,null,5,'3')" + }); + Util.checkQueryOutputsAfterSort(it, expectedRes); + } + + /** + * Test UNION ONSCHEMA where a common column has additional 'namespace' part + * in the column name in both the inputs + * @throws IOException + * @throws ParseException + */ + @Test + public void testUnionOnSchemaScopedColumnNameBothInp2() throws IOException, ParseException { + PigServer pig = new PigServer(ExecType.LOCAL); + String query = + " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " + + " l2 = load '" + INP_FILE_2NUMS + "' as (i : int, x : chararray); " + + " cg1 = cogroup l1 by i, l2 by i; " + + " f1 = foreach cg1 generate group as gkey, flatten(l1), flatten(l2); " + + " cg2 = cogroup l2 by i, l1 by i; " + + " f2 = foreach cg1 generate group as gkey, flatten(l2), flatten(l1); " + + "u = union onschema f1, f2; " ; + Util.registerMultiLineQuery(pig, query); + + Schema sch = pig.dumpSchema("u"); + Schema expectedSch = + Util.getSchemaFromString("gkey: int, l1::i: int, l1::j: int, l2::i: int, l2::x: chararray"); + assertEquals("Checking expected schema",sch, expectedSch); + + Iterator<Tuple> it = pig.openIterator("u"); + List<Tuple> expectedRes = + Util.getTuplesFromConstantTupleStrings( + new String[] { + "(1,1,2,1,'2')", + "(5,5,3,5,'3')", + "(1,1,2,1,'2')", + "(5,5,3,5,'3')", + }); + Util.checkQueryOutputsAfterSort(it, expectedRes); + + } + + /** + * Test UNION ONSCHEMA where a common column has additional 'namespace' part + * in the column name in one of the inputs. + * Negative test case + * @throws IOException + * @throws ParseException + */ + @Test + public void testUnionOnSchemaScopedColumnNameNeg() throws IOException, ParseException { + + String expectedErr = "Found more than one match: l1::i, l2::i"; + String query_prefix = + " l1 = load '/tmp/fn' as (i : int, j : long); " + + "l2 = load '/tmp/fn' as (i : int, j : long); " + + "cg = cogroup l1 by i, l2 by i;" + + "f = foreach cg generate flatten(l1), flatten(l2); " + + "l3 = load '/tmp/fn2' as (i : int, j : long); " + ; + String query = query_prefix + "u = union onschema f, l3; "; + checkSchemaEx(query, expectedErr); + + // now try reversing the order of relation + query = query_prefix + "u = union onschema l3, f; "; + checkSchemaEx(query, expectedErr); + + } /** * Test UNION ONSCHEMA on two inputs with same column names, but different @@ -423,9 +569,108 @@ public class TestUnionOnSchema { } + /** + * Test UNION ONSCHEMA with input relation having udfs + * @throws IOException + * @throws ParseException + */ + @Test + public void testUnionOnSchemaInputUdfs() throws IOException, ParseException { + PigServer pig = new PigServer(ExecType.LOCAL); + String query = + " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);" + + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);" + + "f1 = foreach l1 generate i, CONCAT(j,j) as cj, " + + "org.apache.pig.test.TestUnionOnSchema\\$UDFTupleNullSchema(i,j) as uo;" + + "u = union onschema f1, l2;" + ; + Util.registerMultiLineQuery(pig, query); + + Schema sch = pig.dumpSchema("u"); + Schema expectedSch = + Util.getSchemaFromString("i: int, cj: chararray, uo: Tuple(), j: chararray"); + assertEquals("Checking expected schema",sch, expectedSch); + + Iterator<Tuple> it = pig.openIterator("u"); + List<Tuple> expectedRes = + Util.getTuplesFromConstantTupleStrings( + new String[] { + "(1,null,null,'2')", + "(5,null,null,'3')", + "(1,'22',(1,'2'),null)", + "(5,'33',(5,'3'),null)" + }); + Util.checkQueryOutputsAfterSort(it, expectedRes); + + } + + /** + * Udf that has schema of tuple column with no inner schema + */ + public static class UDFTupleNullSchema extends EvalFunc <Tuple> { + public Tuple exec(Tuple input) { + return input; + } + + @Override + public Schema outputSchema(Schema input) { + FieldSchema fs = + new Schema.FieldSchema(getSchemaName("UDFTupleNullSchema", input), + DataType.TUPLE); + return new Schema(fs); + } + + } + /** + * Test UNION ONSCHEMA with input relation having column names with multiple + * level of namespace in their names + * @throws IOException + * @throws ParseException + */ + @Test + public void testUnionOnSchemaScopeMulti() throws IOException, ParseException { + PigServer pig = new PigServer(ExecType.LOCAL); + String query_prefix = + " a = load '" + INP_FILE_2NUMS+ "' as (i:int, j:int); " + + "b = group a by i; " + + "c = foreach b generate group as gp, flatten(a); " + + "d = group c by $0; " + + "e = foreach d generate group as gp, flatten(c); " + + "f = load '" + INP_FILE_2NUMS + "' as (i, j); " + ; + String query = query_prefix + "u = union onschema e,f;"; + Util.registerMultiLineQuery(pig, query); + + Schema sch = pig.dumpSchema("u"); + Schema expectedSch = + Util.getSchemaFromString("gp: int,c::gp: int,i: int,j: int"); + assertEquals("Checking expected schema",sch, expectedSch); + + + query = query_prefix + "u = union onschema f,e;"; + Util.registerMultiLineQuery(pig, query); + + sch = pig.dumpSchema("u"); + expectedSch = + Util.getSchemaFromString("i: int,j: int, gp: int,c::gp: int"); + assertEquals("Checking expected schema",sch, expectedSch); + + + Iterator<Tuple> it = pig.openIterator("u"); + List<Tuple> expectedRes = + Util.getTuplesFromConstantTupleStrings( + new String[] { + "(1,2,null,null)", + "(5,3,null,null)", + "(1,2,1,1)", + "(5,3,5,5)", + }); + Util.checkQueryOutputsAfterSort(it, expectedRes); + + } }