Author: olga Date: Thu Dec 3 18:36:06 2009 New Revision: 886875 URL: http://svn.apache.org/viewvc?rev=886875&view=rev Log: PIG-1113: Diamond query optimization throws error in JOIN (rding via olgan)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=886875&r1=886874&r2=886875&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Dec 3 18:36:06 2009 @@ -45,6 +45,8 @@ BUG FIXES +PIG-1113: Diamond query optimization throws error in JOIN (rding via olgan) + PIG-1116: Remove redundant map-reduce job for merge join (pradeepkth) PIG-1114: MultiQuery optimization throws error when merging 2 level spl (rding Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=886875&r1=886874&r2=886875&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Thu Dec 3 18:36:06 2009 @@ -270,6 +270,9 @@ PhysicalOperator leaf = mr.mapPlan.getLeaves().get(0); pl.remove(leaf); + POStore store = (POStore)leaf; + String ofile = store.getSFile().getFileName(); + // then connect the remaining map plan to the successor of // each root (load) operator of the splittee for (MapReduceOper succ : succs) { @@ -277,6 +280,11 @@ ArrayList<PhysicalOperator> rootsCopy = new ArrayList<PhysicalOperator>(roots); for (PhysicalOperator op : rootsCopy) { + POLoad load = (POLoad)op; + String ifile = load.getLFile().getFileName(); + if (ofile.compareTo(ifile) != 0) { + continue; + } PhysicalOperator opSucc = succ.mapPlan.getSuccessors(op).get(0); PhysicalPlan clone = null; try { Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=886875&r1=886874&r2=886875&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Thu Dec 3 18:36:06 2009 @@ -175,6 +175,75 @@ } @Test + public void testMultiQueryJiraPig1113() { + + // test case: Diamond query optimization throws error in JOIN + + String INPUT_FILE_1 = "set1.txt"; + String INPUT_FILE_2 = "set2.txt"; + try { + + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE_1)); + w.println("login\t0\tjar"); + w.println("login\t1\tbox"); + w.println("quit\t0\tmany"); + w.close(); + Util.copyFromLocalToCluster(cluster, INPUT_FILE_1, INPUT_FILE_1); + + PrintWriter w2 = new PrintWriter(new FileWriter(INPUT_FILE_2)); + w2.println("apple\tlogin\t{(login)}"); + w2.println("orange\tlogin\t{(login)}"); + w2.println("strawberry\tquit\t{(login)}"); + w2.close(); + Util.copyFromLocalToCluster(cluster, INPUT_FILE_2, INPUT_FILE_2); + + myPig.setBatchOn(); + + myPig.registerQuery("set1 = load '" + INPUT_FILE_1 + + "' USING PigStorage as (a:chararray, b:chararray, c:chararray);"); + myPig.registerQuery("set2 = load '" + INPUT_FILE_2 + + "' USING PigStorage as (a: chararray, b:chararray, c:bag{});"); + myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, " + + "(chararray) 0 as f3;"); + myPig.registerQuery("set2_2 = FOREACH set2 GENERATE a as f1, " + + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;"); + myPig.registerQuery("all_set2 = UNION set2_1, set2_2;"); + myPig.registerQuery("joined_sets = JOIN set1 BY (a,b), all_set2 BY (f2,f3);"); + + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "('quit','0','many','strawberry','quit','0')", + "('login','0','jar','apple','login','0')", + "('login','0','jar','orange','login','0')", + "('login','1','box','apple','login','1')", + "('login','1','box','orange','login','1')", + "('login','1','box','strawberry','login','1')" + }); + + Iterator<Tuple> iter = myPig.openIterator("joined_sets"); + int count = 0; + while (iter.hasNext()) { + assertEquals(expectedResults.get(count++).toString(), iter.next().toString()); + } + assertEquals(expectedResults.size(), count); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + new File(INPUT_FILE_1).delete(); + new File(INPUT_FILE_2).delete(); + try { + Util.deleteFile(cluster, INPUT_FILE_1); + Util.deleteFile(cluster, INPUT_FILE_2); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + } + + @Test public void testMultiQueryJiraPig1060() { // test case: