Author: olga Date: Tue Oct 6 00:46:28 2009 New Revision: 822099 URL: http://svn.apache.org/viewvc?rev=822099&view=rev Log: PIG-975: Need a databag that does not register with SpillableMemoryManager and spill data pro-actively (yinghe via olgan)
Added: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=822099&r1=822098&r2=822099&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Oct 6 00:46:28 2009 @@ -26,6 +26,9 @@ IMPROVEMENTS +PIG-975: Need a databag that does not register with SpillableMemoryManager and +spill data pro-actively (yinghe via olgan) + PIG-891: Fixing dfs statement for Pig (zjffdu via daijy). PIG-956: 10 minute commit tests (olgan) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=822099&r1=822098&r2=822099&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Tue Oct 6 00:46:28 2009 @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.BagFactory; +import org.apache.pig.data.InternalCachedBag; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; @@ -34,6 +35,7 @@ import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; @@ -196,10 +198,20 @@ //Create numInputs bags DataBag[] dbs = null; dbs = new DataBag[numInputs]; - for (int i = 0; i < numInputs; i++) { - dbs[i] = mBagFactory.newDefaultBag(); - } + + String bagType = null; + if (PigMapReduce.sJobConf != null) { + bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type"); + } + for (int i = 0; i < numInputs; i++) { + if (bagType != null && bagType.equalsIgnoreCase("default")) { + dbs[i] = mBagFactory.newDefaultBag(); + } else { + dbs[i] = new InternalCachedBag(numInputs); + } + } + //For each indexed tup in the inp, sort them //into their corresponding bags based //on the index @@ -220,7 +232,7 @@ } if(reporter!=null) reporter.progress(); } - + //Construct the output tuple by appending //the key and all the above constructed bags //and return it. Added: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=822099&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Tue Oct 6 00:46:28 2009 @@ -0,0 +1,228 @@ + +package org.apache.pig.data; + +import java.io.*; +import java.util.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; + + +public class InternalCachedBag extends DefaultAbstractBag { + + private static final Log log = LogFactory.getLog(InternalCachedBag.class); + private int cacheLimit; + private long maxMemUsage; + private long memUsage; + private DataOutputStream out; + private boolean addDone; + private TupleFactory factory; + + + public InternalCachedBag() { + this(1); + } + + public InternalCachedBag(int bagCount) { + float percent = 0.5F; + + if (PigMapReduce.sJobConf != null) { + String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage"); + if (usage != null) { + percent = Float.parseFloat(usage); + } + } + + init(bagCount, percent); + } + + public InternalCachedBag(int bagCount, float percent) { + init(bagCount, percent); + } + + private void init(int bagCount, float percent) { + factory = TupleFactory.getInstance(); + mContents = new ArrayList<Tuple>(); + + long max = Runtime.getRuntime().maxMemory(); + maxMemUsage = (long)(((float)max * percent) / (float)bagCount); + cacheLimit = Integer.MAX_VALUE; + + // set limit to 0, if memusage is 0 or really really small. + // then all tuples are put into disk + if (maxMemUsage < 1) { + cacheLimit = 0; + } + + addDone = false; + } + + public void add(Tuple t) { + + if(addDone) { + throw new IllegalStateException("InternalCachedBag is closed for adding new tuples"); + } + + if(mContents.size() < cacheLimit) { + mMemSizeChanged = true; + mContents.add(t); + if(mContents.size() < 100) + { + memUsage += t.getMemorySize(); + long avgUsage = memUsage / (long)mContents.size(); + cacheLimit = (int)(maxMemUsage / avgUsage); + } + } else { + try { + if(out == null) { + if (log.isDebugEnabled()) { + log.debug("Memory can hold "+ mContents.size() + " records, put the rest in spill file."); + } + out = getSpillFile(); + } + t.write(out); + } + catch(IOException e) { + throw new RuntimeException(e); + } + } + + mSize++; + } + + private void addDone() { + if(out != null) { + try { + out.flush(); + out.close(); + } + catch(IOException e) { + // ignore + } + } + addDone = true; + } + + public void clear() { + if (!addDone) { + addDone(); + } + super.clear(); + addDone = false; + out = null; + } + + protected void finalize() { + if (!addDone) { + // close the spill file so it can be deleted + addDone(); + } + super.finalize(); + } + + public boolean isDistinct() { + return false; + } + + public boolean isSorted() { + return false; + } + + public Iterator<Tuple> iterator() { + if(!addDone) { + // close the spill file and mark adding is done + // so further adding is disallowed. + addDone(); + } + return new CachedBagIterator(); + } + + public long spill() + { + throw new RuntimeException("InternalCachedBag.spill() should not be called"); + } + + private class CachedBagIterator implements Iterator<Tuple> { + Iterator<Tuple> iter; + DataInputStream in; + Tuple next; + + public CachedBagIterator() { + iter = mContents.iterator(); + if(mSpillFiles != null && mSpillFiles.size() > 0) { + File file = (File)mSpillFiles.get(0); + try { + in = new DataInputStream(new BufferedInputStream(new FileInputStream(file))); + } + catch(FileNotFoundException fnfe) { + String msg = "Unable to find our spill file."; + throw new RuntimeException(msg, fnfe); + } + } + } + + + public boolean hasNext() { + if (next != null) { + return true; + } + + if(iter.hasNext()){ + next = (Tuple)iter.next(); + return true; + } + + if(in == null) { + return false; + } + + try { + Tuple t = factory.newTuple(); + t.readFields(in); + next = t; + return true; + }catch(EOFException eof) { + try{ + in.close(); + }catch(IOException e) { + + } + in = null; + return false; + }catch(IOException e) { + String msg = "Unable to read our spill file."; + throw new RuntimeException(msg, e); + } + } + + public Tuple next() { + if (next == null) { + if (!hasNext()) { + throw new IllegalStateException("No more elements from iterator"); + } + } + Tuple t = next; + next = null; + + return t; + } + + public void remove() { + throw new UnsupportedOperationException("remove is not supported for CachedBagIterator"); + } + + protected void finalize() { + if(in != null) { + try + { + in.close(); + } + catch(Exception e) { + + } + } + } + } + +} + Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=822099&r1=822098&r2=822099&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java Tue Oct 6 00:46:28 2009 @@ -21,7 +21,6 @@ import java.io.IOException; import org.junit.Test; - import org.apache.pig.data.*; import org.apache.pig.impl.util.Spillable; @@ -791,6 +790,62 @@ } assertEquals(bg1, bg2); } + + public void testInternalCachedBag() throws Exception { + // check equal of bags + DataBag bg1 = new InternalCachedBag(1, 0.5f); + assertEquals(bg1.size(), 0); + + String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} }; + for (int i = 0; i < tupleContents.length; i++) { + bg1.add(Util.createTuple(tupleContents[i])); + } + + // check size, and isSorted(), isDistinct() + assertEquals(bg1.size(), 3); + assertFalse(bg1.isSorted()); + assertFalse(bg1.isDistinct()); + + tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} }; + DataBag bg2 = new InternalCachedBag(1, 0.5f); + for (int i = 0; i < tupleContents.length; i++) { + bg2.add(Util.createTuple(tupleContents[i])); + } + assertEquals(bg1, bg2); + + // check bag with data written to disk + DataBag bg3 = new InternalCachedBag(1, 0.0f); + tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}}; + for (int i = 0; i < tupleContents.length; i++) { + bg3.add(Util.createTuple(tupleContents[i])); + } + assertEquals(bg1, bg3); + + // check iterator + Iterator<Tuple> iter = bg3.iterator(); + DataBag bg4 = new InternalCachedBag(1, 0.0f); + while(iter.hasNext()) { + bg4.add(iter.next()); + } + assertEquals(bg3, bg4); + + // call iterator methods with irregular order + iter = bg3.iterator(); + assertTrue(iter.hasNext()); + assertTrue(iter.hasNext()); + DataBag bg5 = new InternalCachedBag(1, 0.0f); + bg5.add(iter.next()); + bg5.add(iter.next()); + assertTrue(iter.hasNext()); + bg5.add(iter.next()); + assertFalse(iter.hasNext()); + assertFalse(iter.hasNext()); + assertEquals(bg3, bg5); + + + bg4.clear(); + assertEquals(bg4.size(), 0); + } }