Author: olga
Date: Wed Nov 28 17:40:53 2007
New Revision: 599242

URL: http://svn.apache.org/viewvc?rev=599242&view=rev
Log:
PIG-14: added heartbeat functionality

Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
    incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
    incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java
    incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Wed Nov 28 17:40:53 2007
@@ -29,3 +29,5 @@
        PIG-33 Help was commented out - uncommented (olgan)
 
        PIG-31: second half of concurrent mode problem addressed (olgan)
+
+       PIG-14: added heartbeat functionality (olgan)

Modified: incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java Wed Nov 28 
17:40:53 2007
@@ -29,6 +29,7 @@
 import org.apache.pig.impl.eval.StarSpec;
 import org.apache.pig.impl.io.DataBagFileReader;
 import org.apache.pig.impl.io.DataBagFileWriter;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 
 
 public class BigDataBag extends DataBag {
@@ -62,7 +63,6 @@
     }
         
     private void writeContentToDisk() throws IOException{
-       
        if (writer==null){
                        File store = File.createTempFile("bag",".dat",tempdir);
                        stores.add(store);
@@ -82,7 +82,6 @@
                writer.close();
                writer = null;
         }
-        
     }
     
     @Override
@@ -177,7 +176,6 @@
     
     @Override
        public Iterator<Tuple> content() {
-       
        if (sortInProgress)
                throw new RuntimeException("Cannot open another iterator: a 
sort is in progress");
        
@@ -272,11 +270,12 @@
     private class FileMerger implements Iterator<Tuple>{
        PriorityQueue<HeapEntry> heap;
        private final int FANIN_LIMIT = 25;
+        int curCall;
        DataBagFileWriter writer;
        HeapEntry nextEntry;
        
        public FileMerger() throws IOException{
-               
+            numNotifies = 0;
                Comparator<HeapEntry> comp = new Comparator<HeapEntry>(){
                        public int compare(HeapEntry he1, HeapEntry he2){
                                try
@@ -317,6 +316,14 @@
        }
        
        private void getNextEntry() throws IOException{
+            if (curCall < notifyInterval - 1)
+                curCall ++;
+            else{
+                if (PigMapReduce.reporter != null)
+                    PigMapReduce.reporter.progress();
+                curCall = 0;
+                numNotifies ++;
+            }
                if (heap.isEmpty()){
                        nextEntry = null;
                        writer.close();

Modified: incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataBag.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataBag.java Wed Nov 28 
17:40:53 2007
@@ -27,6 +27,7 @@
 
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 
 
 /**
@@ -114,9 +115,18 @@
         
         Collections.sort(content);
         isSorted = true;
+        int curCall = 0;
         
         Tuple lastTup = null;
         for (Iterator<Tuple> it = content.iterator(); it.hasNext(); ) {
+            if (curCall < notifyInterval - 1)
+                curCall++;
+            else
+            {
+                    if (PigMapReduce.reporter != null)
+                        PigMapReduce.reporter.progress(); 
+                    curCall = 0;
+            }
             Tuple thisTup = it.next();
             
             if (lastTup == null) {
@@ -132,8 +142,37 @@
         }
     }
 
+    public static int notifyInterval = 1000;
+    public int numNotifies; // used for unit tests only
+
     public Iterator<Tuple> content() {
-        return content.iterator();
+        return new Iterator<Tuple>() {
+             Iterator<Tuple> myIt;
+             int curCall;
+
+            {
+                numNotifies = 0;
+                myIt = content.iterator();
+
+            }
+            public final boolean hasNext(){
+                return myIt.hasNext();
+            }
+            public final Tuple next(){
+                if (curCall < notifyInterval - 1)
+                    curCall ++;
+                else{
+                    if (PigMapReduce.reporter != null)
+                        PigMapReduce.reporter.progress(); 
+                    numNotifies ++;
+                    curCall = 0;
+                } 
+                return myIt.next();
+            }
+            public final void remove(){
+                myIt.remove();
+            }
+        };
     }
     
 

Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java 
(original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java Wed 
Nov 28 17:40:53 2007
@@ -26,6 +26,7 @@
 import java.util.Iterator;
 
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 
 
 public class DataBagFileReader {
@@ -35,16 +36,29 @@
                store = f;
        }
        
+    public static int notifyInterval = 1000;
+    public int numNotifies;
        private class myIterator implements Iterator<Tuple>{
                DataInputStream in;
                Tuple nextTuple;
+        int curCall;
                
                public myIterator() throws IOException{
+            numNotifies = 0;
                        in = new DataInputStream(new BufferedInputStream(new 
FileInputStream(store)));
                        getNextTuple();
                }
                
                private void getNextTuple() throws IOException{
+            if (curCall < notifyInterval - 1)
+                curCall ++;
+            else{
+                if (PigMapReduce.reporter != null)
+                    PigMapReduce.reporter.progress();
+                curCall = 0;
+                numNotifies ++;
+            }
+
                        try{
                                nextTuple = new Tuple();
                        nextTuple.readFields(in);

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Wed Nov 28 
17:40:53 2007
@@ -194,6 +194,18 @@
             caught = true;
         }   
         assertTrue(caught);
+
+        // check that notifications are sent
+         b.clear();
+         DataBag.notifyInterval = 2;
+         Tuple g = Util.loadFlatTuple(new Tuple(input1.length), input1);
+         for (int i = 0; i < 10; i++) {
+             b.add(g);
+         }
+
+         Iterator it = b.content();
+         while (it.hasNext()) it.next();
+         assert(b.numNotifies == 5);
     }
 
     @Test
@@ -287,7 +299,7 @@
         }
         
         bag.sort();
-
+        DataBag.notifyInterval = 100;
         it = bag.content();
         count = 0;
         last= "";
@@ -300,6 +312,8 @@
         }
 
         assertTrue(bag.cardinality() == count);
+        int cnt = numItems/DataBag.notifyInterval;
+        assertTrue(bag.numNotifies >= cnt);
         
         bag.clear();
                


Reply via email to