Author: olga
Date: Tue Oct  6 00:46:28 2009
New Revision: 822099

URL: http://svn.apache.org/viewvc?rev=822099&view=rev
Log:
PIG-975: Need a databag that does not register with SpillableMemoryManager and
spill data pro-actively (yinghe via olgan)

Added:
    hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=822099&r1=822098&r2=822099&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Oct  6 00:46:28 2009
@@ -26,6 +26,9 @@
 
 IMPROVEMENTS
 
+PIG-975: Need a databag that does not register with SpillableMemoryManager and
+spill data pro-actively (yinghe via olgan)
+
 PIG-891: Fixing dfs statement for Pig (zjffdu via daijy).
 
 PIG-956: 10 minute commit tests (olgan)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=822099&r1=822098&r2=822099&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 Tue Oct  6 00:46:28 2009
@@ -26,6 +26,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -34,6 +35,7 @@
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -196,10 +198,20 @@
             //Create numInputs bags
             DataBag[] dbs = null;
             dbs = new DataBag[numInputs];
-            for (int i = 0; i < numInputs; i++) {
-                dbs[i] = mBagFactory.newDefaultBag();
-            }
+                 
+            String bagType = null;
+            if (PigMapReduce.sJobConf != null) {
+                               bagType = 
PigMapReduce.sJobConf.get("pig.cachedbag.type");                              
+                   }
             
+               for (int i = 0; i < numInputs; i++) {                           
                                
+                   if (bagType != null && bagType.equalsIgnoreCase("default")) 
{                       
+                               dbs[i] = mBagFactory.newDefaultBag();           
                        
+                   } else {
+                       dbs[i] = new InternalCachedBag(numInputs);
+                   }
+               }      
+                           
             //For each indexed tup in the inp, sort them
             //into their corresponding bags based
             //on the index
@@ -220,7 +232,7 @@
                 }
                 if(reporter!=null) reporter.progress();
             }
-            
+                      
             //Construct the output tuple by appending
             //the key and all the above constructed bags
             //and return it.

Added: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=822099&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Tue Oct  6 
00:46:28 2009
@@ -0,0 +1,228 @@
+
+package org.apache.pig.data;
+
+import java.io.*;
+import java.util.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+
+
+public class InternalCachedBag extends DefaultAbstractBag {
+
+       private static final Log log = 
LogFactory.getLog(InternalCachedBag.class);
+    private int cacheLimit;
+    private long maxMemUsage;
+    private long memUsage;
+    private DataOutputStream out;
+    private boolean addDone;
+    private TupleFactory factory;
+
+ 
+    public InternalCachedBag() {
+        this(1);
+    }
+
+    public InternalCachedBag(int bagCount) {       
+        float percent = 0.5F;
+        
+       if (PigMapReduce.sJobConf != null) {
+               String usage = 
PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+               if (usage != null) {
+                       percent = Float.parseFloat(usage);
+               }
+       }
+
+        init(bagCount, percent);
+    }  
+    
+    public InternalCachedBag(int bagCount, float percent) {
+       init(bagCount, percent);
+    }
+    
+    private void init(int bagCount, float percent) {
+       factory = TupleFactory.getInstance();        
+       mContents = new ArrayList<Tuple>();             
+                
+       long max = Runtime.getRuntime().maxMemory();
+        maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
+        cacheLimit = Integer.MAX_VALUE;
+        
+        // set limit to 0, if memusage is 0 or really really small.
+        // then all tuples are put into disk
+        if (maxMemUsage < 1) {
+               cacheLimit = 0;
+        }
+        
+        addDone = false;
+    }
+
+    public void add(Tuple t) {
+       
+        if(addDone) {
+            throw new IllegalStateException("InternalCachedBag is closed for 
adding new tuples");
+        }
+                
+        if(mContents.size() < cacheLimit)  {
+            mMemSizeChanged = true;
+            mContents.add(t);
+            if(mContents.size() < 100)
+            {
+                memUsage += t.getMemorySize();
+                long avgUsage = memUsage / (long)mContents.size();
+                cacheLimit = (int)(maxMemUsage / avgUsage);
+            }
+        } else {
+            try {
+                if(out == null) {
+                       if (log.isDebugEnabled()) {
+                               log.debug("Memory can hold "+ mContents.size() 
+ " records, put the rest in spill file.");
+                       }
+                    out = getSpillFile();
+                }
+                t.write(out);
+            }
+            catch(IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        
+        mSize++;
+    }
+
+    private void addDone() {
+        if(out != null) {
+            try {
+                out.flush();
+                out.close();
+            }
+            catch(IOException e) { 
+               // ignore
+            }
+        }
+        addDone = true;
+    }
+
+    public void clear() {
+       if (!addDone) {
+               addDone();
+       }
+        super.clear();
+        addDone = false;
+        out = null;
+    }
+
+    protected void finalize() {
+       if (!addDone) {
+               // close the spill file so it can be deleted
+               addDone();
+       }
+       super.finalize();
+    }
+    
+    public boolean isDistinct() {
+        return false;
+    }
+
+    public boolean isSorted() {
+        return false;
+    }
+
+    public Iterator<Tuple> iterator() {
+       if(!addDone) {
+               // close the spill file and mark adding is done
+               // so further adding is disallowed.
+               addDone();
+        }        
+        return new CachedBagIterator();
+    }
+
+    public long spill()
+    {
+        throw new RuntimeException("InternalCachedBag.spill() should not be 
called");
+    }
+    
+    private class CachedBagIterator implements Iterator<Tuple> {
+        Iterator<Tuple> iter;
+        DataInputStream in;
+        Tuple next;
+        
+        public CachedBagIterator() {
+            iter = mContents.iterator();
+            if(mSpillFiles != null && mSpillFiles.size() > 0) {
+                File file = (File)mSpillFiles.get(0);
+                try {
+                    in = new DataInputStream(new BufferedInputStream(new 
FileInputStream(file)));
+                }
+                catch(FileNotFoundException fnfe) {
+                    String msg = "Unable to find our spill file.";
+                    throw new RuntimeException(msg, fnfe);
+                }
+            }
+        }
+
+
+        public boolean hasNext() {
+               if (next != null) {
+                       return true;                    
+               }
+               
+            if(iter.hasNext()){
+                next = (Tuple)iter.next();
+                return true;
+            }
+            
+            if(in == null) {
+                return false;
+            }
+            
+            try {
+               Tuple t = factory.newTuple();
+               t.readFields(in);
+               next = t;
+               return true;
+            }catch(EOFException eof) {
+               try{
+                       in.close();
+               }catch(IOException e) {
+                       
+               }            
+               in = null;
+               return false;
+            }catch(IOException e) {                     
+                String msg = "Unable to read our spill file.";
+                throw new RuntimeException(msg, e);               
+            }
+        }
+
+        public Tuple next() {  
+               if (next == null) {
+                       if (!hasNext()) {
+                               throw new IllegalStateException("No more 
elements from iterator");
+                       }
+               }
+               Tuple t = next;
+               next = null;
+               
+               return t;
+        }
+
+        public void remove() {
+               throw new UnsupportedOperationException("remove is not 
supported for CachedBagIterator");
+        }
+
+        protected void finalize() {
+            if(in != null) {
+                try
+                {
+                    in.close();
+                }
+                catch(Exception e) { 
+                       
+                }
+            }
+        }
+    }
+
+}
+

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=822099&r1=822098&r2=822099&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java Tue Oct  6 
00:46:28 2009
@@ -21,7 +21,6 @@
 import java.io.IOException;
 
 import org.junit.Test;
-
 import org.apache.pig.data.*;
 import org.apache.pig.impl.util.Spillable;
 
@@ -791,6 +790,62 @@
         }
         assertEquals(bg1, bg2);
     }
+    
+    public void testInternalCachedBag() throws Exception {    
+       // check equal of bags
+       DataBag bg1 = new InternalCachedBag(1, 0.5f);
+       assertEquals(bg1.size(), 0);
+       
+       String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { 
"e", "f"} };
+       for (int i = 0; i < tupleContents.length; i++) {
+            bg1.add(Util.createTuple(tupleContents[i]));
+        }
+       
+       // check size, and isSorted(), isDistinct()
+       assertEquals(bg1.size(), 3);
+       assertFalse(bg1.isSorted());
+       assertFalse(bg1.isDistinct());
+       
+       tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
+       DataBag bg2 = new InternalCachedBag(1, 0.5f);
+        for (int i = 0; i < tupleContents.length; i++) {
+             bg2.add(Util.createTuple(tupleContents[i]));
+        }
+        assertEquals(bg1, bg2);
+        
+        // check bag with data written to disk
+        DataBag bg3 = new InternalCachedBag(1, 0.0f);
+        tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
+        for (int i = 0; i < tupleContents.length; i++) {
+            bg3.add(Util.createTuple(tupleContents[i]));
+        }
+        assertEquals(bg1, bg3);
+        
+        // check iterator
+        Iterator<Tuple> iter = bg3.iterator();
+        DataBag bg4 = new InternalCachedBag(1, 0.0f);
+        while(iter.hasNext()) {
+               bg4.add(iter.next());
+        }
+        assertEquals(bg3, bg4);
+        
+        // call iterator methods with irregular order
+        iter = bg3.iterator();
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasNext());
+        DataBag bg5 = new InternalCachedBag(1, 0.0f);
+        bg5.add(iter.next());
+        bg5.add(iter.next());
+        assertTrue(iter.hasNext());
+        bg5.add(iter.next());
+        assertFalse(iter.hasNext());
+        assertFalse(iter.hasNext());
+        assertEquals(bg3, bg5);
+        
+        
+        bg4.clear();
+        assertEquals(bg4.size(), 0);        
+    }
 }
 
 


Reply via email to