Author: olga
Date: Sat Feb 20 02:26:04 2010
New Revision: 912064

URL: http://svn.apache.org/viewvc?rev=912064&view=rev
Log:
PIG-1241: Accumulator is turned on when a map is used with a non-accumulative
UDF (yinghe vi olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Sat Feb 20 02:26:04 2010
@@ -130,6 +130,9 @@
 
 BUG FIXES
 
+PIG-1241: Accumulator is turned on when a map is used with a non-accumulative
+UDF (yinghe vi olgan)
+
 PIG-1215: Make Hadoop jobId more prominent in the client log (ashutoshc)
 
 PIG-1216: New load store design does not allow Pig to validate inputs and

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
 Sat Feb 20 02:26:04 2010
@@ -155,7 +155,7 @@
         }
         
         if (po instanceof POMapLookUp) {
-            return true;
+            return check(po.getInputs().get(0));
         }
         
         if (po instanceof POProject) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Sat Feb 20 02:26:04 2010
@@ -457,9 +457,12 @@
         EndOfAllInputSetter checker = new EndOfAllInputSetter(plan);
         checker.visit();
         
-        AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
-        accum.visit();
-        
+        boolean isAccum = 
+            
"true".equalsIgnoreCase(pc.getProperties().getProperty("opt.accumulator","true"));
+        if (isAccum) {
+            AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
+            accum.visit();
+        }
         return plan;
     }
     

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=912064&r1=912063&r2=912064&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java Sat Feb 20 
02:26:04 2010
@@ -33,6 +33,7 @@
     private static final String INPUT_FILE = "AccumulatorInput.txt";
     private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
     private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
+    private static final String INPUT_FILE4 = "AccumulatorInput4.txt";
  
     private PigServer pigServer;
     private MiniCluster cluster = MiniCluster.buildCluster();
@@ -50,6 +51,7 @@
     
     @Before
     public void setUp() throws Exception {
+        pigServer.getPigContext().getProperties().remove("opt.accumulator");
         createFiles();
     }
 
@@ -94,6 +96,16 @@
         w.close();   
         
         Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+        
+        w = new PrintWriter(new FileWriter(INPUT_FILE4));
+        
+        w.println("100\thttp://ibm.com,ibm";);          
+        w.println("100\thttp://ibm.com,ibm";);
+        w.println("200\thttp://yahoo.com,yahoo";);      
+        w.println("300\thttp://sun.com,sun";);
+        w.close();   
+        
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
     }
     
     @After
@@ -103,7 +115,9 @@
         new File(INPUT_FILE2).delete();
         Util.deleteFile(cluster, INPUT_FILE2);
         new File(INPUT_FILE3).delete();
-        Util.deleteFile(cluster, INPUT_FILE3);
+        Util.deleteFile(cluster, INPUT_FILE3);        
+        new File(INPUT_FILE4).delete();
+        Util.deleteFile(cluster, INPUT_FILE4);
     }
     
     public void testAccumBasic() throws IOException{
@@ -486,17 +500,58 @@
         }    
     }
 
-       // Pig 1105
+    // Pig 1105
     public void testAccumCountStar() throws IOException{
         pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, 
v:double);");
         pigServer.registerQuery("C = group A by id;");
         pigServer.registerQuery("D = foreach C generate group, 
COUNT_STAR(A.id);");
 
-               try {
-                       Iterator<Tuple> iter = pigServer.openIterator("D");
-               } catch (Exception e) {
-                       fail("COUNT_STAR should be supported by accumulator 
interface");
-               }      
-       }
-       
+        try {
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+        } catch (Exception e) {
+            fail("COUNT_STAR should be supported by accumulator interface");
+        }      
+    }
+    
+    
+    public void testAccumulatorOff() throws IOException{
+        
pigServer.getPigContext().getProperties().setProperty("opt.accumulator", 
"false");
+        
+        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group, 
org.apache.pig.test.utils.AccumulativeSumBag(A);");
+        
+        try {
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            int c = 0;
+            while(iter.hasNext()) {
+                iter.next();
+                c++;
+            }
+            fail("Accumulator should be off.");
+        }catch(Exception e) {
+            // we should get exception
+        }
+        
+    }              
+    
+    public void testAccumWithMap() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE4 + "' as (id, 
url);");
+        pigServer.registerQuery("B = group A by (id, url);");
+        pigServer.registerQuery("C = foreach B generate COUNT(A), 
org.apache.pig.test.utils.URLPARSE(group.url)#'url';");                     
+
+        HashMap<Integer, String> expected = new HashMap<Integer, String>();
+        expected.put(2, "http://ibm.com";);
+        expected.put(1, "http://yahoo.com";);
+        expected.put(1, "http://sun.com";);        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Long)t.get(0)), (String)t.get(1));      
          
+        }                                   
+    }        
+    
+
 }


Reply via email to