Author: sms
Date: Fri Jan 30 03:58:07 2009
New Revision: 739161

URL: http://svn.apache.org/viewvc?rev=739161&view=rev
Log:
PIG-646: Distinct UDF should report progress

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jan 30 03:58:07 2009
@@ -390,3 +390,5 @@
     PIG-631: 4 Unit test failures on Windows (daijy)
 
     PIG-645:  Streaming is broken with the latest trunk (pradeepkth)
+
+    PIG-646: Distinct UDF should report progress (sms)

Modified: hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java Fri Jan 30 03:58:07 2009
@@ -116,7 +116,7 @@
     }
         
     // report that progress is being made (otherwise hadoop times out after 
600 seconds working on one outer tuple)
-    protected void progress() { 
+    public final void progress() {
         if (reporter != null) reporter.progress();
         else log.warn("No reporter object provided to UDF " + 
this.getClass().getName());
     }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 Fri Jan 30 03:58:07 2009
@@ -127,6 +127,7 @@
                 Reporter reporter) throws IOException {
             
             pigReporter.setRep(reporter);
+            PhysicalOperator.setReporter(pigReporter);
             
             // In the case we optimize, we combine
             // POPackage and POForeach - so we could get many

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Fri Jan 30 03:58:07 2009
@@ -388,6 +388,7 @@
             // which could additionally be called from close()
             this.outputCollector = oc;
             pigReporter.setRep(reporter);
+            PhysicalOperator.setReporter(pigReporter);
             
             // If the keyType is not a tuple, the MapWithComparator.collect()
             // would have wrapped the key into a tuple so that the 

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Distinct.java Fri Jan 30 
03:58:07 2009
@@ -99,7 +99,7 @@
          */
         @Override
         public Tuple exec(Tuple input) throws IOException {
-            return tupleFactory.newTuple(getDistinctFromNestedBags(input));
+            return tupleFactory.newTuple(getDistinctFromNestedBags(input, 
this));
         }
     }
 
@@ -110,12 +110,13 @@
          */
         @Override
         public DataBag exec(Tuple input) throws IOException {
-            return getDistinctFromNestedBags(input);
+            return getDistinctFromNestedBags(input, this);
         }
     }
     
-    static private DataBag getDistinctFromNestedBags(Tuple input) throws 
IOException {
+    static private DataBag getDistinctFromNestedBags(Tuple input, EvalFunc 
evalFunc) throws IOException {
         DataBag result = bagFactory.newDistinctBag();
+        long progressCounter = 0;
         try {
             DataBag bg = (DataBag)input.get(0);
             for (Tuple tuple : bg) {
@@ -124,6 +125,10 @@
                 // and distinct over all tuples
                 for (Tuple t : (DataBag)tuple.get(0)) {
                     result.add(t);
+                    ++progressCounter;
+                    if((progressCounter % 1000) == 0){                      
+                        evalFunc.progress();
+                    }
                 }
             }
         } catch (ExecException e) {
@@ -132,12 +137,17 @@
         return result;
     }
     
-    static protected DataBag getDistinct(Tuple input) throws IOException {
+    protected DataBag getDistinct(Tuple input) throws IOException {
         try {
             DataBag inputBg = (DataBag)input.get(0);
             DataBag result = bagFactory.newDistinctBag();
+            long progressCounter = 0;
             for (Tuple tuple : inputBg) {
                 result.add(tuple);
+                ++progressCounter;
+                if ((progressCounter % 1000) == 0) {
+                    progress();
+                }
             }
             return result;
         } catch (ExecException e) {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri Jan 30 
03:58:07 2009
@@ -940,6 +940,33 @@
         DataBag expectedBag = Util.createBagOfOneColumn(exp);
         assertEquals(expectedBag, result);
         
+    }    
+
+    @Test
+    public void testDistinctProgressNonAlgebraic() throws Exception {
+
+        //This test is for the exec method in Distinct which is not
+        //called currently.
+
+        int inputSize = 2002;
+        Integer[] inp = new Integer[inputSize];
+        for(int i = 0; i < inputSize; i+=2) {
+            inp[i] = i/2;
+            inp[i+1] = i/2;
+        }
+
+        DataBag inputBag = Util.createBagOfOneColumn(inp);
+        EvalFunc<DataBag> distinct = new Distinct();
+        DataBag result = distinct.exec(tupleFactory.newTuple(inputBag));
+        
+        Integer[] exp = new Integer[inputSize/2];
+        for(int j = 0; j < inputSize/2; ++j) {
+            exp[j] = j;
+        }
+
+        DataBag expectedBag = Util.createBagOfOneColumn(exp);
+        assertEquals(expectedBag, result);
+        
     }
     
     @Test

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=739161&r1=739160&r2=739161&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Fri Jan 30 
03:58:07 2009
@@ -42,6 +42,7 @@
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.Distinct;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.builtin.TextLoader;
 import org.apache.pig.data.*;
@@ -327,13 +328,13 @@
     
 
     
-    /*
+    /*    
     @Test
     public void testSort() throws Exception{
         testSortDistinct(false, false);
-    }
-    */
-    
+    }    
+    */    
+
     @Test
     public void testSortWithUDF() throws Exception{
         testSortDistinct(false, true);
@@ -1043,6 +1044,49 @@
         Util.deleteFile(cluster, "table1");
         Util.deleteFile(cluster, "table2");
     }
+
+    @Test
+    public void testAlgebraicDistinctProgress() throws Exception {
     
+        //creating a test input of larger than 1000 to make
+        //sure that progress kicks in. The only way to test this 
+        //is to add a log statement to the getDistinct
+        //method in Distinct.java. There is no automated mechanism
+        //to check this from pig
+        int inputSize = 4004;
+        Integer[] inp = new Integer[inputSize];
+        String[] inpString = new String[inputSize];
+        for(int i = 0; i < inputSize; i+=2) {
+            inp[i] = i/2;
+            inp[i+1] = i/2;
+            inpString[i] = new Integer(i/2).toString();
+            inpString[i+1] = new Integer(i/2).toString();
+        }
+        
+       
+        DataBag inputBag = Util.createBagOfOneColumn(inp);
+        Util.createInputFile(cluster, "table", inpString);
+
+        pigServer.registerQuery("a = LOAD 'table' AS (i:int);");
+        pigServer.registerQuery("b = group a ALL;");
+        pigServer.registerQuery("c = foreach b {aa = DISTINCT a; generate 
COUNT(aa);};");
+        Iterator<Tuple> it = pigServer.openIterator("c");
+     
+        Integer[] exp = new Integer[inputSize/2];
+        for(int j = 0; j < inputSize/2; ++j) {
+            exp[j] = j;
+        }
+
+        DataBag expectedBag = Util.createBagOfOneColumn(exp);
+        
+        while(it.hasNext()) {
+            Tuple tup = it.next();
+            Long resultBagSize = (Long)tup.get(0);
+            assertTrue(DataType.compare(expectedBag.size(), resultBagSize) == 
0);
+        }
+        
+        Util.deleteFile(cluster, "table");        
+    }
+
 
 }


Reply via email to