Author: olga
Date: Thu Dec  3 18:46:58 2009
New Revision: 886879

URL: http://svn.apache.org/viewvc?rev=886879&view=rev
Log:
PIG-1113: Diamond query optimization throws error in JOIN (rding via olgan)

Modified:
    hadoop/pig/branches/branch-0.6/CHANGES.txt
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=886879&r1=886878&r2=886879&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Thu Dec  3 18:46:58 2009
@@ -127,6 +127,8 @@
 
 BUG FIXES
 
+PIG-1113: Diamond query optimization throws error in JOIN (rding via olgan)
+
 PIG-1116: Remove redundant map-reduce job for merge join (pradeepkth)
 
 PIG-1114: MultiQuery optimization throws error when merging 2 level spl (rding

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=886879&r1=886878&r2=886879&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Thu Dec  3 18:46:58 2009
@@ -270,6 +270,9 @@
         PhysicalOperator leaf = mr.mapPlan.getLeaves().get(0);
         pl.remove(leaf);
         
+        POStore store = (POStore)leaf;
+        String ofile = store.getSFile().getFileName();
+        
         // then connect the remaining map plan to the successor of
         // each root (load) operator of the splittee
         for (MapReduceOper succ : succs) {
@@ -277,6 +280,11 @@
             ArrayList<PhysicalOperator> rootsCopy = 
                 new ArrayList<PhysicalOperator>(roots);
             for (PhysicalOperator op : rootsCopy) {
+                POLoad load = (POLoad)op;
+                String ifile = load.getLFile().getFileName();
+                if (ofile.compareTo(ifile) != 0) {
+                    continue;
+                }
                 PhysicalOperator opSucc = 
succ.mapPlan.getSuccessors(op).get(0);
                 PhysicalPlan clone = null;
                 try {

Modified: 
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java?rev=886879&r1=886878&r2=886879&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java 
(original)
+++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java 
Thu Dec  3 18:46:58 2009
@@ -175,6 +175,75 @@
     }
     
     @Test
+    public void testMultiQueryJiraPig1113() {
+
+        // test case: Diamond query optimization throws error in JOIN
+
+        String INPUT_FILE_1 = "set1.txt";
+        String INPUT_FILE_2 = "set2.txt";
+        try {
+
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE_1));
+            w.println("login\t0\tjar");
+            w.println("login\t1\tbox");
+            w.println("quit\t0\tmany");
+            w.close();
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE_1, INPUT_FILE_1);
+
+            PrintWriter w2 = new PrintWriter(new FileWriter(INPUT_FILE_2));
+            w2.println("apple\tlogin\t{(login)}");
+            w2.println("orange\tlogin\t{(login)}");
+            w2.println("strawberry\tquit\t{(login)}");
+            w2.close();
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE_2, INPUT_FILE_2);
+            
+            myPig.setBatchOn();
+
+            myPig.registerQuery("set1 = load '" + INPUT_FILE_1 
+                    + "' USING PigStorage as (a:chararray, b:chararray, 
c:chararray);");
+            myPig.registerQuery("set2 = load '" + INPUT_FILE_2
+                    + "' USING PigStorage as (a: chararray, b:chararray, 
c:bag{});");
+            myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as 
f2, " 
+                    + "(chararray) 0 as f3;");
+            myPig.registerQuery("set2_2 = FOREACH set2 GENERATE a as f1, "
+                    + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 
as f3;");  
+            myPig.registerQuery("all_set2 = UNION set2_1, set2_2;");
+            myPig.registerQuery("joined_sets = JOIN set1 BY (a,b), all_set2 BY 
(f2,f3);");
+          
+            List<Tuple> expectedResults = 
Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "('quit','0','many','strawberry','quit','0')",
+                            "('login','0','jar','apple','login','0')",
+                            "('login','0','jar','orange','login','0')",
+                            "('login','1','box','apple','login','1')",
+                            "('login','1','box','orange','login','1')",
+                            "('login','1','box','strawberry','login','1')"
+                    });
+            
+            Iterator<Tuple> iter = myPig.openIterator("joined_sets");
+            int count = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(count++).toString(), 
iter.next().toString());
+            }
+            assertEquals(expectedResults.size(), count);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE_1).delete();
+            new File(INPUT_FILE_2).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE_1);
+                Util.deleteFile(cluster, INPUT_FILE_2);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+    
+    @Test
     public void testMultiQueryJiraPig1060() {
 
         // test case: 


Reply via email to