Author: pradeepkth Date: Thu Nov 19 17:55:36 2009 New Revision: 882221 URL: http://svn.apache.org/viewvc?rev=882221&view=rev Log: PIG-1064: Behaviour of COGROUP with and without schema when using * operator (pradeepkth)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=882221&r1=882220&r2=882221&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Nov 19 17:55:36 2009 @@ -33,6 +33,9 @@ BUG FIXES +PIG-1064: Behaviour of COGROUP with and without schema when using "*" operator +(pradeepkth) + Release 0.6.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java?rev=882221&r1=882220&r2=882221&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java Thu Nov 19 17:55:36 2009 @@ -57,7 +57,6 @@ //get the attributes of cogroup that are modified during the trnalsation MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cg.getGroupByPlans(); - for(LogicalOperator op: cg.getInputs()) { ArrayList<LogicalPlan> newGByPlans = new ArrayList<LogicalPlan>(); for(LogicalPlan lp: mapGByPlans.get(op)) { @@ -70,9 +69,41 @@ newGByPlans.add(lp); } } + + mapGByPlans.removeKey(op); mapGByPlans.put(op, newGByPlans); } + + // check if after translation none of group by plans in a cogroup + // have a project(*) - if they still do it's because the input + // for the project(*) did not have a schema - in this case, we should + // error out since we could have different number/types of + // cogroup keys + if(cg.getInputs().size() > 1) { // only for cogroups + for(LogicalOperator op: cg.getInputs()) { + for(LogicalPlan lp: mapGByPlans.get(op)) { + if(checkPlanForProjectStar(lp)) { + // not following Error handling guidelines to give error code + // and error source since this will get swallowed by the parser + // which will just return a ParseException + throw new VisitorException("Cogroup/Group by * is only allowed if " + + "the input has a schema"); + } + } + } + // check if after translation all group by plans have same arity + int arity = mapGByPlans.get(cg.getInputs().get(0)).size(); + for(LogicalOperator op: cg.getInputs()) { + if(arity != mapGByPlans.get(op).size()) { + // not following Error handling guidelines to give error code + // and error source since this will get swallowed by the parser + // which will just return a ParseException + throw new VisitorException("The arity of cogroup/group by columns " + + "do not match"); + } + } + } } /* (non-Javadoc) Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=882221&r1=882220&r2=882221&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Nov 19 17:55:36 2009 @@ -1029,19 +1029,19 @@ ) { if(null != root) { - log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases); - - //Translate all the project(*) leaves in the plan to a sequence of projections - ProjectStarTranslator translate = new ProjectStarTranslator(lp); - translate.visit(); - - addLogicalPlan(root, lp); - try { - log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema()); + log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases); + + //Translate all the project(*) leaves in the plan to a sequence of projections + ProjectStarTranslator translate = new ProjectStarTranslator(lp); + translate.visit(); + + addLogicalPlan(root, lp); + + log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema()); } catch(FrontendException fee) { - ParseException pe = new ParseException(fee.getMessage()); - pe.initCause(fee); + ParseException pe = new ParseException(fee.getMessage()); + pe.initCause(fee); throw pe; } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=882221&r1=882220&r2=882221&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu Nov 19 17:55:36 2009 @@ -291,7 +291,7 @@ @Test public void testQuery22Fail() { - buildPlan("A = load 'a';"); + buildPlan("A = load 'a' as (a:int, b: double);"); try { buildPlan("B = group A by (*, $0);"); } catch (AssertionFailedError e) { @@ -323,15 +323,50 @@ @Test public void testQuery23Fail() { + buildPlan("A = load 'a' as (a: int, b:double);"); + buildPlan("B = load 'b';"); + boolean exceptionThrown = false; + try { + buildPlan("C = cogroup A by (*, $0), B by ($0, $1);"); + } catch (AssertionFailedError e) { + assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " + + "do not match")); + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void testQuery23Fail2() { buildPlan("A = load 'a';"); buildPlan("B = load 'b';"); + boolean exceptionThrown = false; try { - buildPlan("C = group A by (*, $0), B by ($0, $1);"); + buildPlan("C = cogroup A by (*, $0), B by ($0, $1);"); } catch (AssertionFailedError e) { - assertTrue(e.getMessage().contains("Grouping attributes can either be star (*")); + assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " + + "the input has a schema")); + exceptionThrown = true; } + assertTrue(exceptionThrown); + } + + @Test + public void testQuery23Fail3() { + buildPlan("A = load 'a' as (a: int, b:double);"); + buildPlan("B = load 'b' as (a:int);"); + boolean exceptionThrown = false; + try { + buildPlan("C = cogroup A by *, B by *;"); + } catch (AssertionFailedError e) { + assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " + + "do not match")); + exceptionThrown = true; + } + assertTrue(exceptionThrown); } + @Test public void testQuery24() { buildPlan("a = load 'a';"); @@ -1642,7 +1677,7 @@ } @Test - public void testQuery110() throws FrontendException, ParseException { + public void testQuery110Fail() throws FrontendException, ParseException { LogicalPlan lp; LOLoad load; LOCogroup cogroup; @@ -1651,13 +1686,16 @@ lp = buildPlan("b = load 'two';"); load = (LOLoad) lp.getLeaves().get(0); - + boolean exceptionThrown = false; + try{ lp = buildPlan("c = cogroup a by $0, b by *;"); - cogroup = (LOCogroup) lp.getLeaves().get(0); - - MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cogroup.getGroupByPlans(); - LogicalPlan cogroupPlan = (LogicalPlan)(mapGByPlans.get(load).toArray())[0]; - assertTrue(checkPlanForProjectStar(cogroupPlan) == true); + } catch(AssertionFailedError e) { + assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " + + "the input has a schema")); + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } @@ -2102,6 +2140,37 @@ fail(); } + @Test + public void testCogroupByStarFailure1() { + boolean exceptionThrown = false; + try { + buildPlan(" a = load '1.txt' as (a0:int, a1:int);"); + buildPlan(" b = load '2.txt'; "); + buildPlan("c = cogroup a by *, b by *;"); + } catch (AssertionFailedError e) { + assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " + + "the input has a schema")); + exceptionThrown = true; + } + assertEquals("An exception was expected but did " + + "not occur", true, exceptionThrown); + } + + @Test + public void testCogroupByStarFailure2() { + boolean exceptionThrown = false; + try { + buildPlan(" a = load '1.txt' ;"); + buildPlan(" b = load '2.txt' as (b0:int, b1:int); "); + buildPlan("c = cogroup a by *, b by *;"); + } catch (AssertionFailedError e) { + assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " + + "the input has a schema")); + exceptionThrown = true; + } + assertEquals("An exception was expected but did " + + "not occur", true, exceptionThrown); + } private void printPlan(LogicalPlan lp) { LOPrinter graphPrinter = new LOPrinter(System.err, lp); System.err.println("Printing the logical plan"); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=882221&r1=882220&r2=882221&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java Thu Nov 19 17:55:36 2009 @@ -24,6 +24,7 @@ import java.util.List; import java.util.ArrayList; +import junit.framework.AssertionFailedError; import junit.framework.TestCase; import org.apache.pig.EvalFunc; @@ -49,7 +50,18 @@ public class TestTypeCheckingValidator extends TestCase { - LogicalPlanTester planTester = new LogicalPlanTester() ; + LogicalPlanTester planTester; + + /* (non-Javadoc) + * @see junit.framework.TestCase#setUp() + */ + @Override + protected void setUp() throws Exception { + // create a new instance of the plan tester + // for each test so that different tests do not + // interact with each other's plans + planTester = new LogicalPlanTester() ; + } private static final String simpleEchoStreamingCommand; static { @@ -3287,77 +3299,19 @@ } @Test - public void testCogroupStarLineageNoSchema() throws Throwable { - planTester.buildPlan("a = load 'a' using BinStorage() ;") ; - planTester.buildPlan("b = load 'b' using PigStorage() ;") ; - planTester.buildPlan("c = cogroup a by *, b by * ;") ; - planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ; - LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, $1 + 1, $2 + 2.0;") ; - - // validate - CompilationMessageCollector collector = new CompilationMessageCollector() ; - TypeCheckingValidator typeValidator = new TypeCheckingValidator() ; - try { - typeValidator.validate(plan, collector) ; - } - catch (PlanValidationException pve) { - //not good - } - - printMessageCollector(collector) ; - printTypeGraph(plan) ; - planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName()); - - if (collector.hasError()) { - throw new AssertionError("Expect no error") ; - } - - - LOForEach foreach = (LOForEach)plan.getLeaves().get(0); - LogicalPlan foreachPlan = foreach.getForEachPlans().get(1); - - LogicalOperator exOp = foreachPlan.getRoots().get(0); - - if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1); - - LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0); - assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("BinStorage")); - - foreachPlan = foreach.getForEachPlans().get(2); - exOp = foreachPlan.getRoots().get(0); - if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1); - cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0); - assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage")); - - } - - @Test public void testCogroupStarLineageNoSchemaFail() throws Throwable { planTester.buildPlan("a = load 'a' using BinStorage() ;") ; planTester.buildPlan("b = load 'b' using PigStorage() ;") ; - planTester.buildPlan("c = cogroup a by *, b by * ;") ; - planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ; - LogicalPlan plan = planTester.buildPlan("e = foreach d generate group + 1, $1 + 1, $2 + 2.0;") ; - - // validate - CompilationMessageCollector collector = new CompilationMessageCollector() ; - TypeCheckingValidator typeValidator = new TypeCheckingValidator() ; + boolean exceptionThrown = false; try { - typeValidator.validate(plan, collector) ; - fail("Exception expected") ; - } - catch (PlanValidationException pve) { - //not good - } - - printMessageCollector(collector) ; - printTypeGraph(plan) ; - planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName()); - - if (!collector.hasError()) { - throw new AssertionError("Expect error") ; + LogicalPlan lp = planTester.buildPlan("c = cogroup a by *, b by *;"); + } catch(AssertionFailedError e) { + assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " + + "the input has a schema")); + exceptionThrown = true; } - + assertTrue(exceptionThrown); + } @Test