Author: olga
Date: Tue Oct 6 00:46:28 2009
New Revision: 822099
URL: http://svn.apache.org/viewvc?rev=822099&view=rev
Log:
PIG-975: Need a databag that does not register with SpillableMemoryManager and
spill data pro-actively (yinghe via olgan)
Added:
hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=822099&r1=822098&r2=822099&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Oct 6 00:46:28 2009
@@ -26,6 +26,9 @@
IMPROVEMENTS
+PIG-975: Need a databag that does not register with SpillableMemoryManager and
+spill data pro-actively (yinghe via olgan)
+
PIG-891: Fixing dfs statement for Pig (zjffdu via daijy).
PIG-956: 10 minute commit tests (olgan)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=822099&r1=822098&r2=822099&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
Tue Oct 6 00:46:28 2009
@@ -26,6 +26,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -34,6 +35,7 @@
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -196,10 +198,20 @@
//Create numInputs bags
DataBag[] dbs = null;
dbs = new DataBag[numInputs];
- for (int i = 0; i < numInputs; i++) {
- dbs[i] = mBagFactory.newDefaultBag();
- }
+
+ String bagType = null;
+ if (PigMapReduce.sJobConf != null) {
+ bagType =
PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ }
+ for (int i = 0; i < numInputs; i++) {
+ if (bagType != null && bagType.equalsIgnoreCase("default"))
{
+ dbs[i] = mBagFactory.newDefaultBag();
+ } else {
+ dbs[i] = new InternalCachedBag(numInputs);
+ }
+ }
+
//For each indexed tup in the inp, sort them
//into their corresponding bags based
//on the index
@@ -220,7 +232,7 @@
}
if(reporter!=null) reporter.progress();
}
-
+
//Construct the output tuple by appending
//the key and all the above constructed bags
//and return it.
Added: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=822099&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Tue Oct 6
00:46:28 2009
@@ -0,0 +1,228 @@
+
+package org.apache.pig.data;
+
+import java.io.*;
+import java.util.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+
+
+public class InternalCachedBag extends DefaultAbstractBag {
+
+ private static final Log log =
LogFactory.getLog(InternalCachedBag.class);
+ private int cacheLimit;
+ private long maxMemUsage;
+ private long memUsage;
+ private DataOutputStream out;
+ private boolean addDone;
+ private TupleFactory factory;
+
+
+ public InternalCachedBag() {
+ this(1);
+ }
+
+ public InternalCachedBag(int bagCount) {
+ float percent = 0.5F;
+
+ if (PigMapReduce.sJobConf != null) {
+ String usage =
PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+ if (usage != null) {
+ percent = Float.parseFloat(usage);
+ }
+ }
+
+ init(bagCount, percent);
+ }
+
+ public InternalCachedBag(int bagCount, float percent) {
+ init(bagCount, percent);
+ }
+
+ private void init(int bagCount, float percent) {
+ factory = TupleFactory.getInstance();
+ mContents = new ArrayList<Tuple>();
+
+ long max = Runtime.getRuntime().maxMemory();
+ maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
+ cacheLimit = Integer.MAX_VALUE;
+
+ // set limit to 0, if memusage is 0 or really really small.
+ // then all tuples are put into disk
+ if (maxMemUsage < 1) {
+ cacheLimit = 0;
+ }
+
+ addDone = false;
+ }
+
+ public void add(Tuple t) {
+
+ if(addDone) {
+ throw new IllegalStateException("InternalCachedBag is closed for
adding new tuples");
+ }
+
+ if(mContents.size() < cacheLimit) {
+ mMemSizeChanged = true;
+ mContents.add(t);
+ if(mContents.size() < 100)
+ {
+ memUsage += t.getMemorySize();
+ long avgUsage = memUsage / (long)mContents.size();
+ cacheLimit = (int)(maxMemUsage / avgUsage);
+ }
+ } else {
+ try {
+ if(out == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Memory can hold "+ mContents.size()
+ " records, put the rest in spill file.");
+ }
+ out = getSpillFile();
+ }
+ t.write(out);
+ }
+ catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ mSize++;
+ }
+
+ private void addDone() {
+ if(out != null) {
+ try {
+ out.flush();
+ out.close();
+ }
+ catch(IOException e) {
+ // ignore
+ }
+ }
+ addDone = true;
+ }
+
+ public void clear() {
+ if (!addDone) {
+ addDone();
+ }
+ super.clear();
+ addDone = false;
+ out = null;
+ }
+
+ protected void finalize() {
+ if (!addDone) {
+ // close the spill file so it can be deleted
+ addDone();
+ }
+ super.finalize();
+ }
+
+ public boolean isDistinct() {
+ return false;
+ }
+
+ public boolean isSorted() {
+ return false;
+ }
+
+ public Iterator<Tuple> iterator() {
+ if(!addDone) {
+ // close the spill file and mark adding is done
+ // so further adding is disallowed.
+ addDone();
+ }
+ return new CachedBagIterator();
+ }
+
+ public long spill()
+ {
+ throw new RuntimeException("InternalCachedBag.spill() should not be
called");
+ }
+
+ private class CachedBagIterator implements Iterator<Tuple> {
+ Iterator<Tuple> iter;
+ DataInputStream in;
+ Tuple next;
+
+ public CachedBagIterator() {
+ iter = mContents.iterator();
+ if(mSpillFiles != null && mSpillFiles.size() > 0) {
+ File file = (File)mSpillFiles.get(0);
+ try {
+ in = new DataInputStream(new BufferedInputStream(new
FileInputStream(file)));
+ }
+ catch(FileNotFoundException fnfe) {
+ String msg = "Unable to find our spill file.";
+ throw new RuntimeException(msg, fnfe);
+ }
+ }
+ }
+
+
+ public boolean hasNext() {
+ if (next != null) {
+ return true;
+ }
+
+ if(iter.hasNext()){
+ next = (Tuple)iter.next();
+ return true;
+ }
+
+ if(in == null) {
+ return false;
+ }
+
+ try {
+ Tuple t = factory.newTuple();
+ t.readFields(in);
+ next = t;
+ return true;
+ }catch(EOFException eof) {
+ try{
+ in.close();
+ }catch(IOException e) {
+
+ }
+ in = null;
+ return false;
+ }catch(IOException e) {
+ String msg = "Unable to read our spill file.";
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ public Tuple next() {
+ if (next == null) {
+ if (!hasNext()) {
+ throw new IllegalStateException("No more
elements from iterator");
+ }
+ }
+ Tuple t = next;
+ next = null;
+
+ return t;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException("remove is not
supported for CachedBagIterator");
+ }
+
+ protected void finalize() {
+ if(in != null) {
+ try
+ {
+ in.close();
+ }
+ catch(Exception e) {
+
+ }
+ }
+ }
+ }
+
+}
+
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=822099&r1=822098&r2=822099&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestDataBag.java Tue Oct 6
00:46:28 2009
@@ -21,7 +21,6 @@
import java.io.IOException;
import org.junit.Test;
-
import org.apache.pig.data.*;
import org.apache.pig.impl.util.Spillable;
@@ -791,6 +790,62 @@
}
assertEquals(bg1, bg2);
}
+
+ public void testInternalCachedBag() throws Exception {
+ // check equal of bags
+ DataBag bg1 = new InternalCachedBag(1, 0.5f);
+ assertEquals(bg1.size(), 0);
+
+ String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, {
"e", "f"} };
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg1.add(Util.createTuple(tupleContents[i]));
+ }
+
+ // check size, and isSorted(), isDistinct()
+ assertEquals(bg1.size(), 3);
+ assertFalse(bg1.isSorted());
+ assertFalse(bg1.isDistinct());
+
+ tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
+ DataBag bg2 = new InternalCachedBag(1, 0.5f);
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg2.add(Util.createTuple(tupleContents[i]));
+ }
+ assertEquals(bg1, bg2);
+
+ // check bag with data written to disk
+ DataBag bg3 = new InternalCachedBag(1, 0.0f);
+ tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
+ for (int i = 0; i < tupleContents.length; i++) {
+ bg3.add(Util.createTuple(tupleContents[i]));
+ }
+ assertEquals(bg1, bg3);
+
+ // check iterator
+ Iterator<Tuple> iter = bg3.iterator();
+ DataBag bg4 = new InternalCachedBag(1, 0.0f);
+ while(iter.hasNext()) {
+ bg4.add(iter.next());
+ }
+ assertEquals(bg3, bg4);
+
+ // call iterator methods with irregular order
+ iter = bg3.iterator();
+ assertTrue(iter.hasNext());
+ assertTrue(iter.hasNext());
+ DataBag bg5 = new InternalCachedBag(1, 0.0f);
+ bg5.add(iter.next());
+ bg5.add(iter.next());
+ assertTrue(iter.hasNext());
+ bg5.add(iter.next());
+ assertFalse(iter.hasNext());
+ assertFalse(iter.hasNext());
+ assertEquals(bg3, bg5);
+
+
+ bg4.clear();
+ assertEquals(bg4.size(), 0);
+ }
}