Author: olga Date: Thu Dec 3 01:57:41 2009 New Revision: 886650 URL: http://svn.apache.org/viewvc?rev=886650&view=rev Log: PIG-1114: MultiQuery optimization throws error when merging 2 level spl (rding via olgan)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=886650&r1=886649&r2=886650&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Dec 3 01:57:41 2009 @@ -45,6 +45,9 @@ BUG FIXES +PIG-1114: MultiQuery optimization throws error when merging 2 level spl (rding +via olgan) + PIG-1108: Incorrect map output key type in MultiQuery optimiza (rding via olgan) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=886650&r1=886649&r2=886650&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Thu Dec 3 01:57:41 2009 @@ -552,7 +552,7 @@ return sameKeyType; } - private int setIndexOnLRInSplit(int initial, POSplit splitOp) + private int setIndexOnLRInSplit(int initial, POSplit splitOp, boolean sameKeyType) throws VisitorException { int index = initial; @@ -568,9 +568,15 @@ String msg = "Internal Error. Unable to set multi-query index for optimization."; throw new OptimizerException(msg, errCode, PigException.BUG, e); } + + // change the map key type to tuple when + // multiple splittees have different map key types + if (!sameKeyType) { + lr.setKeyType(DataType.TUPLE); + } } else if (leaf instanceof POSplit) { POSplit spl = (POSplit)leaf; - index = setIndexOnLRInSplit(index, spl); + index = setIndexOnLRInSplit(index, spl, sameKeyType); } } @@ -615,7 +621,7 @@ // across all POLocalRearranges in all merged map plans // including nested ones in POSplit POSplit spl = (POSplit)leaf; - curIndex = setIndexOnLRInSplit(index, spl); + curIndex = setIndexOnLRInSplit(index, spl, sameKeyType); } splitOp.addPlan(pl); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=886650&r1=886649&r2=886650&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Thu Dec 3 01:57:41 2009 @@ -116,6 +116,65 @@ } @Test + public void testMultiQueryJiraPig1114() { + + // test case: MultiQuery optimization throws error when merging 2 level splits + + String INPUT_FILE = "data.txt"; + + try { + + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("10\tjar"); + w.println("20\tbox"); + w.println("30\tbot"); + w.close(); + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + + myPig.setBatchOn(); + + myPig.registerQuery("data = load '" + INPUT_FILE + + "' USING PigStorage as (id:int, name:chararray);"); + myPig.registerQuery("ids = FOREACH data GENERATE id;"); + myPig.registerQuery("allId = GROUP ids all;"); + myPig.registerQuery("allIdCount = FOREACH allId GENERATE group as allId, COUNT(ids) as total;"); + myPig.registerQuery("idGroup = GROUP ids by id;"); + myPig.registerQuery("idGroupCount = FOREACH idGroup GENERATE group as id, COUNT(ids) as count;"); + myPig.registerQuery("countTotal = cross idGroupCount, allIdCount;"); + myPig.registerQuery("idCountTotal = foreach countTotal generate id, count, total, (double)count / (double)total as proportion;"); + myPig.registerQuery("orderedCounts = order idCountTotal by count desc;"); + myPig.registerQuery("STORE orderedCounts INTO '/tmp/output1';"); + + myPig.registerQuery("names = FOREACH data GENERATE name;"); + myPig.registerQuery("allNames = GROUP names all;"); + myPig.registerQuery("allNamesCount = FOREACH allNames GENERATE group as namesAll, COUNT(names) as total;"); + myPig.registerQuery("nameGroup = GROUP names by name;"); + myPig.registerQuery("nameGroupCount = FOREACH nameGroup GENERATE group as name, COUNT(names) as count;"); + myPig.registerQuery("namesCrossed = cross nameGroupCount, allNamesCount;"); + myPig.registerQuery("nameCountTotal = foreach namesCrossed generate name, count, total, (double)count / (double)total as proportion;"); + myPig.registerQuery("nameCountsOrdered = order nameCountTotal by count desc;"); + myPig.registerQuery("STORE nameCountsOrdered INTO '/tmp/output2';"); + + List<ExecJob> jobs = myPig.executeBatch(); + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + new File(INPUT_FILE).delete(); + try { + Util.deleteFile(cluster, INPUT_FILE); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + } + + @Test public void testMultiQueryJiraPig1060() { // test case: