Author: pradeepkth Date: Tue Mar 3 23:48:43 2009 New Revision: 749846 URL: http://svn.apache.org/viewvc?rev=749846&view=rev Log: PIG-577: outer join query looses name information (sms via pradeepkth)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/Main.java hadoop/pig/trunk/src/org/apache/pig/data/DataType.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java hadoop/pig/trunk/test/org/apache/pig/test/TestSchema.java hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=749846&r1=749845&r2=749846&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Mar 3 23:48:43 2009 @@ -442,3 +442,5 @@ PIG-691: BinStorage skips tuples when ^A is present in data (pradeepkth via sms) + + PIG-577: outer join query looses name information (sms via pradeepkth) Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=749846&r1=749845&r2=749846&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/Main.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/Main.java Tue Mar 3 23:48:43 2009 @@ -75,7 +75,7 @@ boolean verbose = false; boolean gruntCalled = false; - String logFileName = validateLogFile(null, null); + String logFileName = null; try { BufferedReader pin = null; @@ -236,6 +236,11 @@ configureLog4J(properties); // create the context with the parameter PigContext pigContext = new PigContext(execType, properties); + + if(logFileName == null) { + logFileName = validateLogFile(null, null); + } + pigContext.getProperties().setProperty("pig.logfile", logFileName); LogicalPlanBuilder.classloader = pigContext.createCl(null); @@ -507,7 +512,7 @@ if(logFile.isDirectory()) { if(logFile.canWrite()) { try { - logFileName += logFile.getCanonicalPath() + File.separator + defaultLogFileName; + logFileName = logFile.getCanonicalPath() + File.separator + defaultLogFileName; } catch (IOException ioe) { throw new AssertionError("Could not compute canonical path to the log file " + ioe.getMessage()); } @@ -556,8 +561,8 @@ //revert to the current working directory String currDir = System.getProperty("user.dir"); logFile = new File(currDir); - if(logFile.canWrite()) { - logFileName = currDir + File.separator + (logFileName == null? defaultLogFileName : logFileName); + logFileName = currDir + File.separator + (logFileName == null? defaultLogFileName : logFileName); + if(logFile.canWrite()) { return logFileName; } throw new RuntimeException("Cannot write to log file: " + logFileName); Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataType.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataType.java?rev=749846&r1=749845&r2=749846&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/DataType.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/DataType.java Tue Mar 3 23:48:43 2009 @@ -873,6 +873,7 @@ if((null == currSchema) || (currSchema.size() != schemaSize)) { Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, null, TUPLE); Schema bagSchema = new Schema(tupleFs); + bagSchema.setTwoLevelAccessRequired(true); return new Schema.FieldSchema(null, bagSchema, BAG); } schema = Schema.mergeSchema(schema, currSchema, false, false, false); Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=749846&r1=749845&r2=749846&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Mar 3 23:48:43 2009 @@ -3065,6 +3065,10 @@ ( <BAG> "{" (fs = TypeSchemaTuple() | {} {fs = new Schema.FieldSchema(null, new Schema());}) "}" ) { s = new Schema(fs); + // since this schema has tuple field schema which internally + // has a list of field schemas for the actual items in the bag + // an access to any field in the bag is a two level access + s.setTwoLevelAccessRequired(true); if (null != t1) { log.debug("BAG alias " + t1.image); fs = new Schema.FieldSchema(t1.image, s, DataType.BAG); Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=749846&r1=749845&r2=749846&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Tue Mar 3 23:48:43 2009 @@ -362,7 +362,7 @@ // Don't do the comparison if both embedded schemas are // null. That will cause Schema.equals to return false, // even though we want to view that as true. - if (!(fschema.schema == null && fother.schema == null)) { + if (!(fschema.schema == null && fother.schema == null)) { // compare recursively using schema if (!Schema.equals(fschema.schema, fother.schema, false, relaxAlias)) { return false ; @@ -1155,6 +1155,32 @@ if (other == null) { return false ; } + + /* + * Need to check for bags with schemas and bags with tuples that in turn have schemas. + * Retrieve the tuple schema of the bag if twoLevelAccessRequired + * Assuming that only bags exhibit this behavior and twoLevelAccessRequired is used + * with the right intentions + */ + if(schema.isTwoLevelAccessRequired() || other.isTwoLevelAccessRequired()) { + if(schema.isTwoLevelAccessRequired()) { + try { + schema = schema.getField(0).schema; + } catch (FrontendException fee) { + return false; + } + } + + if(other.isTwoLevelAccessRequired()) { + try { + other = other.getField(0).schema; + } catch (FrontendException fee) { + return false; + } + } + + return Schema.equals(schema, other, relaxInner, relaxAlias); + } if (schema.size() != other.size()) return false; Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=749846&r1=749845&r2=749846&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue Mar 3 23:48:43 2009 @@ -1898,6 +1898,7 @@ DataType.TUPLE); Schema bagSchema = new Schema(tupleFs); + bagSchema.setTwoLevelAccessRequired(true); Schema.FieldSchema bagFs = new Schema.FieldSchema( "bag_of_tokenTuples",bagSchema, DataType.BAG); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSchema.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSchema.java?rev=749846&r1=749845&r2=749846&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestSchema.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestSchema.java Tue Mar 3 23:48:43 2009 @@ -77,9 +77,9 @@ Schema schema2 = new Schema(list2) ; Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ; - + innerList2.get(1).alias = "pi" ; - + Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ; Assert.assertTrue(Schema.equals(schema1, schema2, false, true)) ; @@ -567,5 +567,63 @@ // Compare Assert.assertTrue(Schema.equals(mergedSchema, expected, false, false)) ; } + + @Test + public void testSchemaEqualTwoLevelAccess() throws Exception { + + List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ; + innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ; + innerList1.add(new FieldSchema("11b", DataType.LONG)) ; + + List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ; + innerList2.add(new FieldSchema("11a", DataType.INTEGER)) ; + innerList2.add(new FieldSchema("11b", DataType.LONG)) ; + + Schema innerSchema1 = new Schema(innerList1) ; + Schema innerSchema2 = new Schema(innerList2) ; + + List<FieldSchema> list1 = new ArrayList<FieldSchema>() ; + list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ; + list1.add(new FieldSchema("1b", innerSchema1)) ; + list1.add(new FieldSchema("1c", DataType.INTEGER)) ; + + List<FieldSchema> list2 = new ArrayList<FieldSchema>() ; + list2.add(new FieldSchema("1a", DataType.BYTEARRAY)) ; + list2.add(new FieldSchema("1b", innerSchema2)) ; + list2.add(new FieldSchema("1c", DataType.INTEGER)) ; + + Schema schema1 = new Schema(list1) ; + Schema schema2 = new Schema(list2) ; + + Schema.FieldSchema bagFs1 = new Schema.FieldSchema("b", schema1, DataType.BAG); + Schema bagSchema1 = new Schema(bagFs1); + + Schema.FieldSchema tupleFs = new Schema.FieldSchema("t", schema2, DataType.TUPLE); + Schema bagSchema = new Schema(tupleFs); + bagSchema.setTwoLevelAccessRequired(true); + Schema.FieldSchema bagFs2 = new Schema.FieldSchema("b", bagSchema, DataType.BAG); + Schema bagSchema2 = new Schema(bagFs2); + + + Assert.assertTrue(Schema.equals(bagSchema1, bagSchema2, false, false)) ; + + innerList2.get(1).alias = "pi" ; + + Assert.assertFalse(Schema.equals(bagSchema1, bagSchema2, false, false)) ; + Assert.assertTrue(Schema.equals(bagSchema1, bagSchema2, false, true)) ; + + innerList2.get(1).alias = "11b" ; + innerList2.get(1).type = DataType.BYTEARRAY ; + + Assert.assertFalse(Schema.equals(bagSchema1, bagSchema2, false, false)) ; + Assert.assertTrue(Schema.equals(bagSchema1, bagSchema2, true, false)) ; + + innerList2.get(1).type = DataType.LONG ; + + Assert.assertTrue(Schema.equals(bagSchema1, bagSchema2, false, false)) ; + + list2.get(0).type = DataType.CHARARRAY ; + Assert.assertFalse(Schema.equals(bagSchema1, bagSchema2, false, false)) ; + } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=749846&r1=749845&r2=749846&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java Tue Mar 3 23:48:43 2009 @@ -5557,28 +5557,28 @@ assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage")); } - + @Test public void testBincond() throws Throwable { planTester.buildPlan("a = load 'a' as (name: chararray, age: int, gpa: float);") ; planTester.buildPlan("b = group a by name;") ; LogicalPlan plan = planTester.buildPlan("c = foreach b generate (IsEmpty(a) ? " + TestBinCondFieldSchema.class.getName() + "(*): a) ;") ; - + // validate CompilationMessageCollector collector = new CompilationMessageCollector() ; TypeCheckingValidator typeValidator = new TypeCheckingValidator() ; typeValidator.validate(plan, collector) ; - + printMessageCollector(collector) ; printTypeGraph(plan) ; planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName()); - + if (collector.hasError()) { throw new AssertionError("Did not expect an error") ; } - - + + LOForEach foreach = (LOForEach)plan.getLeaves().get(0); Schema.FieldSchema charFs = new FieldSchema(null, DataType.CHARARRAY); @@ -5598,7 +5598,35 @@ Schema expectedSchema = new Schema(bagFs); assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false, true)); + + } + @Test + public void testBinCondForOuterJoin() throws Throwable { + planTester.buildPlan("a = LOAD 'student_data' AS (name: chararray, age: int, gpa: float);"); + planTester.buildPlan("b = LOAD 'voter_data' AS (name: chararray, age: int, registration: chararray, contributions: float);"); + planTester.buildPlan("c = COGROUP a BY name, b BY name;"); + LogicalPlan plan = planTester.buildPlan("d = FOREACH c GENERATE group, flatten((not IsEmpty(a) ? a : (bag{tuple(chararray, int, float)}){(null, null, null)})), flatten((not IsEmpty(b) ? b : (bag{tuple(chararray, int, chararray, float)}){(null,null,null, null)}));"); + + // validate + CompilationMessageCollector collector = new CompilationMessageCollector() ; + TypeCheckingValidator typeValidator = new TypeCheckingValidator() ; + typeValidator.validate(plan, collector) ; + + printMessageCollector(collector) ; + printTypeGraph(plan) ; + planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName()); + + if (collector.hasError()) { + throw new AssertionError("Expect no error") ; + } + + + LOForEach foreach = (LOForEach)plan.getLeaves().get(0); + String expectedSchemaString = "mygroup: chararray,A::name: chararray,A::age: int,A::gpa: float,B::name: chararray,B::age: int,B::registration: chararray,B::contributions: float"; + Schema expectedSchema = Util.getSchemaFromString(expectedSchemaString); + assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false, true)); + } /*