Author: utkarsh
Date: Fri Dec  7 16:55:29 2007
New Revision: 602287

URL: http://svn.apache.org/viewvc?rev=602287&view=rev
Log:
PIG-44: Added adaptive decision of the number of records to hold in memory 
        before spilling

Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
    incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java
    incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri Dec  7 16:55:29 2007
@@ -50,3 +50,6 @@
     PIG-47: Added methods to DataMap to provide access to its content
 
        PIG-12: Added time stamps to log4j messages (phunt via gates).
+
+       PIG-44: Added adaptive decision of the number of records to hold in 
memory 
+       before spilling (utkarsh)

Modified: incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java Fri Dec  7 
16:55:29 2007
@@ -40,7 +40,9 @@
     
     boolean finishedAdds = false,wantSorting = false, doneSorting = false, 
sortInProgress = false, wroteUnsortedFile = false;
     int trueCount = 0;
-
+    int numRecordsToHoldInMemory = 1000;
+    
+    
     boolean eliminateDuplicates = false;
     EvalSpec spec = null;
     
@@ -50,7 +52,10 @@
      * cause us to switch to disk backed mode
      */
     public static long FREE_MEMORY_TO_MAINTAIN = (long)(MAX_MEMORY*.25);
-    
+    /**
+     * want to hold roughly only 1% of max memory, once we are in a low memory 
condition
+     */
+    public static long TARGET_IN_MEMORY_SIZE = (long)(0.01 * MAX_MEMORY);
     
     public BigDataBag(File tempdir) throws IOException {
        this.tempdir = tempdir;
@@ -61,10 +66,16 @@
        long usedMemory = Runtime.getRuntime().totalMemory() - freeMemory;
        return MAX_MEMORY-usedMemory > memLimit;
     }
-        
+    
+    private File getTempFile() throws IOException{
+               File store = File.createTempFile("bag",".dat",tempdir);
+               store.deleteOnExit();
+               return store;
+    }
+    
     private void writeContentToDisk() throws IOException{
        if (writer==null){
-                       File store = File.createTempFile("bag",".dat",tempdir);
+                       File store = getTempFile();
                        stores.add(store);
                        writer = new DataBagFileWriter(store);
                }
@@ -76,7 +87,10 @@
        }else{
                wroteUnsortedFile = true;
        }
-       writer.write(content.iterator());
+       long bytesWritten = writer.write(content.iterator());
+       //Adjust the number of records to hold in memory so that what 
+       //we hold in memory is about TARGET_IN_MEMORY_SIZE
+       numRecordsToHoldInMemory = (int)((numRecordsToHoldInMemory * 
TARGET_IN_MEMORY_SIZE) / bytesWritten);
        super.clear();
        if (wantSorting){
                writer.close();
@@ -95,7 +109,7 @@
                if (writer == null) {
                        //Want to add in memory
                        super.add(t);
-                   if (!isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN) && 
trueCount > 10) {
+                   if (!isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN) && 
content.size() > numRecordsToHoldInMemory) {
                        writeContentToDisk();
                    }   
                }else{
@@ -133,7 +147,7 @@
        Iterator<Tuple> iter = reader.content();
        while(iter.hasNext()){
                DataBag bag = new DataBag();
-               while( iter.hasNext() && 
isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN/2)){
+               while( iter.hasNext() && 
(isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN/2) || bag.cardinality() < 
numRecordsToHoldInMemory)){
                        bag.add(iter.next());
                }
                if(eliminateDuplicates){
@@ -141,7 +155,7 @@
                        trueCount = bag.cardinality();
                }else
                        bag.sort(spec);
-               File f = File.createTempFile("bag", ".dat",tempdir);
+               File f = getTempFile();
                stores.add(f);
                DataBagFileWriter writer = new DataBagFileWriter(f);
                writer.write(bag.content());
@@ -306,7 +320,7 @@
                        }
                }
                
-               File outputFile  = File.createTempFile("bag",".dat",tempdir);
+               File outputFile  = getTempFile();
                stores.add(outputFile);
                writer = new DataBagFileWriter(outputFile);
                

Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java 
(original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java Fri 
Dec  7 16:55:29 2007
@@ -41,10 +41,20 @@
                t.write(out);
        }
        
-       public void write(Iterator<Tuple> iter) throws IOException{
+       public long write(Iterator<Tuple> iter) throws IOException{
+       
+               long initialSize = getFileLength();
                while (iter.hasNext())
-                       iter.next().write(out);
+                       iter.next().write(out);
+               
+               return getFileLength() - initialSize;
        }
+       
+       public long getFileLength() throws IOException{
+               out.flush();
+               return store.length();
+       }
+       
        
        public void close() throws IOException{
                flush();

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Fri Dec  7 
16:55:29 2007
@@ -19,7 +19,6 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
@@ -216,176 +215,78 @@
 
     public void testBigDataBagOnDisk() throws Exception{
        Runtime.getRuntime().gc();
-       testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 10000);
+       testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 1000000);
     }
 
+    private enum TestType {
+       PRE_SORT,
+       POST_SORT,
+       PRE_DISTINCT,
+       POST_DISTINCT,
+       NONE
+    }
+       
     
     private void testBigDataBag(long freeMemoryToMaintain, int numItems) 
throws Exception {
        BigDataBag.FREE_MEMORY_TO_MAINTAIN = freeMemoryToMaintain;
-       File tmp = File.createTempFile("test", "bag").getParentFile();
-        BigDataBag bag = new BigDataBag(tmp);
-        Iterator<Tuple> it;
-        int count;
-        String last;
-    
         Random r = new Random();
-        
-        
-        //Basic test
-        assertTrue(bag.isEmpty());
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(2);
-            t.setField(0, Integer.toHexString(i));
-            t.setField(1, i);
-            bag.add(t);
-        }
-        
-        assertFalse(bag.isEmpty());
-
-        assertTrue(bag.cardinality() == numItems);
-        
-        int lastI = -1;
-        it = bag.content();
-        count = 0;
-        while(it.hasNext()) {
-            Tuple t = it.next();
-            int ix = Integer.parseInt(t.getAtomField(0).strval(), 16);
-            
assertTrue(Integer.toString(ix).equals(t.getAtomField(1).strval()));
-            assertEquals(lastI+1, ix);
-            lastI = ix;
-            count++;
-        }
-        
-        assertTrue(bag.cardinality() == count);
-        
-        bag.distinct();
-
-        bag.clear();
+   
+       for (TestType testType: TestType.values()){
+               BigDataBag bag = BagFactory.getInstance().getNewBigBag();
+
+            assertTrue(bag.isEmpty());
+
+            if (testType == TestType.PRE_SORT)
+               bag.sort();
+            else if (testType == TestType.PRE_DISTINCT)
+               bag.distinct();
+            
+            //generate data and add it to the bag
+            for(int i = 0; i < numItems; i++) {
+                Tuple t = new Tuple(1);
+                t.setField(0, r.nextInt(numItems));
+                bag.add(t);
+            }
+
+            assertFalse(bag.isEmpty());
+
+            if (testType == TestType.POST_SORT)
+               bag.sort();
+            else if (testType == TestType.POST_DISTINCT)
+               bag.distinct();
+
+            
+            if (testType == TestType.NONE)
+               assertTrue(bag.cardinality() == numItems);
+            checkContents(bag, numItems, testType);
+            checkContents(bag, numItems, testType);
 
-        //Test pre sort
-        
-        bag.sort();
-        
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            t.setField(0, r.nextInt(100000));
-            bag.add(t);
-        }
-        
-        it = bag.content();
-        count = 0;
-        last= "";
-        while(it.hasNext()) {
-            Tuple t = it.next();
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<=0);
-            last = next;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-
-
-        //Test post sort
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            t.setField(0, r.nextInt(100000));
-            bag.add(t);
-        }
+       }
+    }
+     
+    
+    private void checkContents(DataBag bag, int numItems, TestType testType) 
throws Exception{
+        String last = "";
         
-        bag.sort();
         DataBag.notifyInterval = 100;
-        it = bag.content();
-        count = 0;
-        last= "";
-        while(it.hasNext()) {
-            Tuple t = it.next();
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<=0);
-            last = next;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        int cnt = numItems/DataBag.notifyInterval;
-        assertTrue(bag.numNotifies >= cnt);
-        
-        bag.clear();
-               
-        //test post-distinct
-        
-       
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            //To get a lot of duplicates
-            t.setField(0, r.nextInt(1000));
-            bag.add(t);
-        }
         
-        
-        bag.distinct();
-
-        it = bag.content();
-        count = 0;
-        last= "";
+        Iterator<Tuple> it = bag.content();
+        int count = 0;
         while(it.hasNext()) {
-            Tuple t = it.next();
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
+               Tuple t = it.next();
+               String next = t.getAtomField(0).strval();
+               if (testType == TestType.POST_SORT || testType == 
TestType.PRE_SORT)
+                assertTrue(last.compareTo(next)<=0);
+               else if (testType == TestType.POST_DISTINCT || testType == 
TestType.PRE_DISTINCT)
+                assertTrue(last.compareTo(next)<0);
             last = next;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-               
-        
-        //Test pre distinct
-
-        bag.distinct();
-
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            //To get a lot of duplicates
-            t.setField(0, r.nextInt(10));
-            bag.add(t);
+               count++;
         }
         
-
-        it = bag.content();
-        count = 0;
-        last= "";
-        while(it.hasNext()) {
-            Tuple t = it.next();
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
-            last = next;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-
-        //Check if it gives the correct contents the second time around
-        it = bag.content();
-        count = 0;
-        last= "";
-        while(it.hasNext()) {
-            Tuple t = it.next();
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
-            last = next;
-            count++;
-        }
-
         assertTrue(bag.cardinality() == count);
         
-        bag.clear();
+        if (testType != TestType.NONE)
+               assertTrue(bag.numNotifies >= count/DataBag.notifyInterval);
     }
+
 }


Reply via email to