Author: olga Date: Wed Nov 28 17:40:53 2007 New Revision: 599242 URL: http://svn.apache.org/viewvc?rev=599242&view=rev Log: PIG-14: added heartbeat functionality
Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java incubator/pig/trunk/src/org/apache/pig/data/DataBag.java incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=599242&r1=599241&r2=599242&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Wed Nov 28 17:40:53 2007 @@ -29,3 +29,5 @@ PIG-33 Help was commented out - uncommented (olgan) PIG-31: second half of concurrent mode problem addressed (olgan) + + PIG-14: added heartbeat functionality (olgan) Modified: incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java?rev=599242&r1=599241&r2=599242&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java (original) +++ incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java Wed Nov 28 17:40:53 2007 @@ -29,6 +29,7 @@ import org.apache.pig.impl.eval.StarSpec; import org.apache.pig.impl.io.DataBagFileReader; import org.apache.pig.impl.io.DataBagFileWriter; +import org.apache.pig.impl.mapreduceExec.PigMapReduce; public class BigDataBag extends DataBag { @@ -62,7 +63,6 @@ } private void writeContentToDisk() throws IOException{ - if (writer==null){ File store = File.createTempFile("bag",".dat",tempdir); stores.add(store); @@ -82,7 +82,6 @@ writer.close(); writer = null; } - } @Override @@ -177,7 +176,6 @@ @Override public Iterator<Tuple> content() { - if (sortInProgress) throw new RuntimeException("Cannot open another iterator: a sort is in progress"); @@ -272,11 +270,12 @@ private class FileMerger implements Iterator<Tuple>{ PriorityQueue<HeapEntry> heap; private final int FANIN_LIMIT = 25; + int curCall; DataBagFileWriter writer; HeapEntry nextEntry; public FileMerger() throws IOException{ - + numNotifies = 0; Comparator<HeapEntry> comp = new Comparator<HeapEntry>(){ public int compare(HeapEntry he1, HeapEntry he2){ try @@ -317,6 +316,14 @@ } private void getNextEntry() throws IOException{ + if (curCall < notifyInterval - 1) + curCall ++; + else{ + if (PigMapReduce.reporter != null) + PigMapReduce.reporter.progress(); + curCall = 0; + numNotifies ++; + } if (heap.isEmpty()){ nextEntry = null; writer.close(); Modified: incubator/pig/trunk/src/org/apache/pig/data/DataBag.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataBag.java?rev=599242&r1=599241&r2=599242&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/data/DataBag.java (original) +++ incubator/pig/trunk/src/org/apache/pig/data/DataBag.java Wed Nov 28 17:40:53 2007 @@ -27,6 +27,7 @@ import org.apache.pig.impl.eval.EvalSpec; import org.apache.pig.impl.eval.collector.DataCollector; +import org.apache.pig.impl.mapreduceExec.PigMapReduce; /** @@ -114,9 +115,18 @@ Collections.sort(content); isSorted = true; + int curCall = 0; Tuple lastTup = null; for (Iterator<Tuple> it = content.iterator(); it.hasNext(); ) { + if (curCall < notifyInterval - 1) + curCall++; + else + { + if (PigMapReduce.reporter != null) + PigMapReduce.reporter.progress(); + curCall = 0; + } Tuple thisTup = it.next(); if (lastTup == null) { @@ -132,8 +142,37 @@ } } + public static int notifyInterval = 1000; + public int numNotifies; // used for unit tests only + public Iterator<Tuple> content() { - return content.iterator(); + return new Iterator<Tuple>() { + Iterator<Tuple> myIt; + int curCall; + + { + numNotifies = 0; + myIt = content.iterator(); + + } + public final boolean hasNext(){ + return myIt.hasNext(); + } + public final Tuple next(){ + if (curCall < notifyInterval - 1) + curCall ++; + else{ + if (PigMapReduce.reporter != null) + PigMapReduce.reporter.progress(); + numNotifies ++; + curCall = 0; + } + return myIt.next(); + } + public final void remove(){ + myIt.remove(); + } + }; } Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java?rev=599242&r1=599241&r2=599242&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java Wed Nov 28 17:40:53 2007 @@ -26,6 +26,7 @@ import java.util.Iterator; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.mapreduceExec.PigMapReduce; public class DataBagFileReader { @@ -35,16 +36,29 @@ store = f; } + public static int notifyInterval = 1000; + public int numNotifies; private class myIterator implements Iterator<Tuple>{ DataInputStream in; Tuple nextTuple; + int curCall; public myIterator() throws IOException{ + numNotifies = 0; in = new DataInputStream(new BufferedInputStream(new FileInputStream(store))); getNextTuple(); } private void getNextTuple() throws IOException{ + if (curCall < notifyInterval - 1) + curCall ++; + else{ + if (PigMapReduce.reporter != null) + PigMapReduce.reporter.progress(); + curCall = 0; + numNotifies ++; + } + try{ nextTuple = new Tuple(); nextTuple.readFields(in); Modified: incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java?rev=599242&r1=599241&r2=599242&view=diff ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java (original) +++ incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Wed Nov 28 17:40:53 2007 @@ -194,6 +194,18 @@ caught = true; } assertTrue(caught); + + // check that notifications are sent + b.clear(); + DataBag.notifyInterval = 2; + Tuple g = Util.loadFlatTuple(new Tuple(input1.length), input1); + for (int i = 0; i < 10; i++) { + b.add(g); + } + + Iterator it = b.content(); + while (it.hasNext()) it.next(); + assert(b.numNotifies == 5); } @Test @@ -287,7 +299,7 @@ } bag.sort(); - + DataBag.notifyInterval = 100; it = bag.content(); count = 0; last= ""; @@ -300,6 +312,8 @@ } assertTrue(bag.cardinality() == count); + int cnt = numItems/DataBag.notifyInterval; + assertTrue(bag.numNotifies >= cnt); bag.clear();