Author: hashutosh
Date: Mon Feb  8 19:19:37 2010
New Revision: 907760

URL: http://svn.apache.org/viewvc?rev=907760&view=rev
Log:
PIG-1224: Collected group should change to use new (internal) bag

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=907760&r1=907759&r2=907760&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Feb  8 19:19:37 2010
@@ -24,6 +24,8 @@
 
 IMPROVEMENTS
 
+PIG-1224: Collected group should change to use new (internal) bag (ashutoshc)
+
 PIG-1046: join algorithm specification is within double quotes (ashutoshc)
 
 PIG-1209: Port POJoinPackage to proactively spill (ashutoshc)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=907760&r1=907759&r2=907760&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
 Mon Feb  8 19:19:37 2010
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -31,6 +32,7 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -69,6 +71,8 @@
     
     private Object prevKey = null;
     
+    private boolean useDefaultBag = false;
+    
     public POCollectedGroup(OperatorKey k) {
         this(k, -1, null);
     }
@@ -199,8 +203,24 @@
 
             // the first time, just create a new buffer and continue.
             if (prevKey == null && outputBag == null) {
+                
+                if (PigMapReduce.sJobConf != null) {
+                    String bagType = 
PigMapReduce.sJobConf.get("pig.cachedbag.type");
+                    if (bagType != null && 
bagType.equalsIgnoreCase("default")) {
+                        useDefaultBag = true;
+                    }
+                }
                 prevKey = curKey;
-                outputBag = BagFactory.getInstance().newDefaultBag();
+                outputBag = useDefaultBag ? 
BagFactory.getInstance().newDefaultBag() 
+                // In a very rare case if there is a POStream after this 
+                // POCollectedGroup in the pipeline and is also blocking the 
pipeline;
+                // constructor argument should be 2. But for one obscure
+                // case we don't want to pay the penalty all the time.
+                        
+                // Additionally, if there is a merge join(on a different key) 
following POCollectedGroup
+                // default bags should be used. But since we don't allow 
anything
+                // before Merge Join currently we are good.        
+                        : new InternalCachedBag(1);
                 outputBag.add((Tuple)tup.get(1));
                 continue;
             }
@@ -224,7 +244,8 @@
             res.result = tup2;
                
             prevKey = curKey;
-            outputBag = BagFactory.getInstance().newDefaultBag();
+            outputBag = useDefaultBag ? 
BagFactory.getInstance().newDefaultBag() 
+                    : new InternalCachedBag(1);
             outputBag.add((Tuple)tup.get(1));
 
             return res;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java?rev=907760&r1=907759&r2=907760&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
 Mon Feb  8 19:19:37 2010
@@ -25,6 +25,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalCachedBag;
@@ -44,6 +45,8 @@
     private boolean lastInputTuple = false;
     private static final Tuple t1 = null;
     private static final Result eopResult = new Result(POStatus.STATUS_EOP, 
null);
+    private boolean firstTime = true;
+    private boolean useDefaultBag = false;
 
     public static final String DEFAULT_CHUNK_SIZE = "1000";
 
@@ -100,6 +103,16 @@
      */
     @Override
     public Result getNext(Tuple t) throws ExecException {
+        
+        if(firstTime){
+            firstTime = false;
+            if (PigMapReduce.sJobConf != null) {
+                String bagType = 
PigMapReduce.sJobConf.get("pig.cachedbag.type");
+                if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                    useDefaultBag = true;
+                }
+            }
+        }
         // if a previous call to foreach.getNext()
         // has still not returned all output, process it
         if (forEach.processingPlan)
@@ -126,17 +139,14 @@
         {
             lastInputTuple = false;
             //Put n-1 inputs into bags
-            String bagType = null;
-            if (PigMapReduce.sJobConf != null) {
-                   bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");  
                 
-               }
             dbs = new DataBag[numInputs];
             for (int i = 0; i < numInputs; i++) {
-                if (bagType != null && bagType.equalsIgnoreCase("default")) {  
                
-                    dbs[i] = mBagFactory.newDefaultBag();                    
-                } else {
-                 dbs[i] = new InternalCachedBag(numInputs);
-             }    
+                dbs[i] = useDefaultBag ? 
BagFactory.getInstance().newDefaultBag() 
+                // In a very rare case if there is a POStream after this 
+                // POJoinPackage in the pipeline and is also blocking the 
pipeline;
+                // constructor argument should be 2 * numInputs. But for one 
obscure
+                // case we don't want to pay the penalty all the time.        
+                        : new InternalCachedBag(numInputs);                    
             }
             
             //For each Nullable tuple in the input, put it

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=907760&r1=907759&r2=907760&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 Mon Feb  8 19:19:37 2010
@@ -115,6 +115,10 @@
 
     protected static final BagFactory mBagFactory = BagFactory.getInstance();
     protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
+    
+    private boolean firstTime = true;
+    
+    private boolean useDefaultBag = false;
 
     public POPackage(OperatorKey k) {
         this(k, -1, null);
@@ -211,6 +215,17 @@
     @Override
     public Result getNext(Tuple t) throws ExecException {
         Tuple res;
+        
+        if(firstTime){
+            firstTime = false;
+            if (PigMapReduce.sJobConf != null) {
+                String bagType = 
PigMapReduce.sJobConf.get("pig.cachedbag.type");
+                if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                    useDefaultBag = true;
+                }
+            }
+        }
+        
         if(distinct) {
             // only set the key which has the whole
             // tuple 
@@ -232,20 +247,14 @@
                 
             } else {
                 // create bag to pull all tuples out of iterator
-                String bagType = null;
-                if (PigMapReduce.sJobConf != null) {
-                       bagType = 
PigMapReduce.sJobConf.get("pig.cachedbag.type");                              
-                   }
-                                
-
-                for (int i = 0; i < numInputs; i++) {                          
                                
-                    if (bagType != null && 
bagType.equalsIgnoreCase("default")) {                      
-                           dbs[i] = mBagFactory.newDefaultBag();               
                
-                       } else {
-                        dbs[i] = new InternalCachedBag(numInputs);
-                    }
-                }      
-                               
+                for (int i = 0; i < numInputs; i++) {
+                    dbs[i] = useDefaultBag ? 
BagFactory.getInstance().newDefaultBag()
+                    // In a very rare case if there is a POStream after this 
+                    // POPackage in the pipeline and is also blocking the 
pipeline;
+                    // constructor argument should be 2 * numInputs. But for 
one obscure
+                    // case we don't want to pay the penalty all the time.     
           
+                            : new InternalCachedBag(numInputs);                
    
+                }                               
                 //For each indexed tup in the inp, sort them
                 //into their corresponding bags based
                 //on the index


Reply via email to