Author: gates
Date: Mon Nov 10 17:23:00 2008
New Revision: 712900

URL: http://svn.apache.org/viewvc?rev=712900&view=rev
Log:
PIG-484 Work by Pradeep to use the streaming join optimization for group by 
queries as well.


Modified:
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 Mon Nov 10 17:23:00 2008
@@ -25,6 +25,7 @@
 
 import org.apache.pig.data.DataType;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -97,9 +98,12 @@
     private int mKeyField = -1;
 
     private byte mKeyType = 0;
+    
+    private String chunkSize;
 
-    public CombinerOptimizer(MROperPlan plan) {
+    public CombinerOptimizer(MROperPlan plan, String chunkSize) {
         super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+        this.chunkSize = chunkSize;
     }
 
     @Override
@@ -194,7 +198,12 @@
                     fixUpRearrange(clr);
                     mr.combinePlan.add(clr);
                     mr.combinePlan.connect(cfe, clr);
-
+                    
+                    // stream input to the algebraics in the 
+                    // combine plan
+                    LastInputStreamingOptimizer.replaceWithPOJoinPackage(
+                            mr.combinePlan, cp, cfe, chunkSize);
+                    
                     // Use the ExprType list returned from algebraic to tell
                     // POPostCombinerPackage which fields need projected and
                     // which placed in bags.

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
 Mon Nov 10 17:23:00 2008
@@ -18,7 +18,7 @@
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.CoGroupStreamingOptimizerVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
@@ -116,9 +116,12 @@
         comp.randomizeFileLocalizer();
         comp.compile();
         MROperPlan plan = comp.getMRPlan();
+        String lastInputChunkSize = 
+            pc.getProperties().getProperty(
+                    "last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
         String prop = System.getProperty("pig.exec.nocombiner");
         if (!("true".equals(prop)))  {
-            CombinerOptimizer co = new CombinerOptimizer(plan);
+            CombinerOptimizer co = new CombinerOptimizer(plan, 
lastInputChunkSize);
             co.visit();
         }
         
@@ -131,9 +134,9 @@
         checker.visit();
 
         // optimize joins
-        CoGroupStreamingOptimizerVisitor cgso = new 
MRCompiler.CoGroupStreamingOptimizerVisitor(plan,
-                pc.getProperties().getProperty("join.biggest.input.chunksize", 
POJoinPackage.DEFAULT_CHUNK_SIZE));
-        cgso.visit();
+        LastInputStreamingOptimizer liso = 
+            new MRCompiler.LastInputStreamingOptimizer(plan, 
lastInputChunkSize);
+        liso.visit();
 
         // figure out the type of the key for the map plan
         // this is needed when the key is null to create

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Mon Nov 10 17:23:00 2008
@@ -1176,11 +1176,11 @@
         return mro;
     }
 
-    static class CoGroupStreamingOptimizerVisitor extends MROpPlanVisitor {
+    static class LastInputStreamingOptimizer extends MROpPlanVisitor {
         
         Log log = LogFactory.getLog(this.getClass());
         String chunkSize;
-        CoGroupStreamingOptimizerVisitor(MROperPlan plan, String chunkSize) {
+        LastInputStreamingOptimizer(MROperPlan plan, String chunkSize) {
             super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
             this.chunkSize = chunkSize;
         }
@@ -1199,12 +1199,14 @@
             // 3. No combiner plan
             // 4. POForEach nested plan only contains POProject in any depth
             // 5. Inside POForEach, all occurrences of the last input are 
flattened
+            
             if (mr.mapPlan.isEmpty()) return;
             if (mr.reducePlan.isEmpty()) return;
 
             // Check combiner plan
-            if (!mr.combinePlan.isEmpty())
+            if (!mr.combinePlan.isEmpty()) {
                 return;
+            }
             
             // Check map plan
             List<PhysicalOperator> mpLeaves = mr.mapPlan.getLeaves();
@@ -1313,44 +1315,50 @@
                 if (lastInputFlattened && allSimple && projOfLastInput != null)
                 {
                     // Now we can optimize the map-reduce plan
-                    
                     // Replace POPackage->POForeach to POJoinPackage
-                    String scope = pack.getOperatorKey().scope;
-                    NodeIdGenerator nig = NodeIdGenerator.getGenerator();
-                    POJoinPackage joinPackage;
-                    joinPackage = new POJoinPackage(
-                                new OperatorKey(scope, 
nig.getNextNodeId(scope)), 
-                                -1, pack, forEach);
-                    joinPackage.setChunkSize(Long.parseLong(chunkSize));
-                    PhysicalOperator nextOp = null;
-                    List<PhysicalOperator> succs = 
mr.reducePlan.getSuccessors(forEach);
-                    if (succs!=null)
-                    {
-                        if (succs.size()!=1)
-                        {
-                            String msg = new String("forEach can only have one 
successor");
-                            log.error(msg);
-                            throw new VisitorException(msg);
-                        }
-                        nextOp = succs.get(0);
-                    }
-                    mr.reducePlan.remove(pack);
-                    
-                    try {
-                        mr.reducePlan.replace(forEach, joinPackage);
-                    } catch (PlanException e) {
-                        String msg = new String("Error rewrite POJoinPackage");
-                        log.error(msg);
-                        throw new VisitorException(msg, e);
-                    }
-                    
-                    log.info("Rewrite: POPackage->POForEach to POJoinPackage");
+                    replaceWithPOJoinPackage(mr.reducePlan, pack, forEach, 
chunkSize);
                 }
             }
         }
 
+        public static void replaceWithPOJoinPackage(PhysicalPlan plan,
+                POPackage pack, POForEach forEach, String chunkSize) throws 
VisitorException {
+            String scope = pack.getOperatorKey().scope;
+            NodeIdGenerator nig = NodeIdGenerator.getGenerator();
+            POJoinPackage joinPackage;
+            joinPackage = new POJoinPackage(
+                        new OperatorKey(scope, nig.getNextNodeId(scope)), 
+                        -1, pack, forEach);
+            joinPackage.setChunkSize(Long.parseLong(chunkSize));
+            List<PhysicalOperator> succs = plan.getSuccessors(forEach);
+            if (succs!=null)
+            {
+                if (succs.size()!=1)
+                {
+                    String msg = new String("forEach can only have one 
successor");
+                    LogFactory.
+                        getLog(LastInputStreamingOptimizer.class).error(msg);
+                    throw new VisitorException(msg);
+                }
+            }
+            plan.remove(pack);
+            
+            try {
+                plan.replace(forEach, joinPackage);
+            } catch (PlanException e) {
+                String msg = new String("Error rewrite POJoinPackage");
+                LogFactory.
+                getLog(LastInputStreamingOptimizer.class).error(msg);
+                throw new VisitorException(msg, e);
+            }
+            
+            LogFactory.
+            getLog(LastInputStreamingOptimizer.class).info("Rewrite: 
POPackage->POForEach to POJoinPackage");
+        }
+
     }
     
+    
     private class RearrangeAdjuster extends MROpPlanVisitor {
 
         RearrangeAdjuster(MROperPlan plan) {

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Mon Nov 10 17:23:00 2008
@@ -33,7 +33,7 @@
 import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.impl.PigContext;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.CoGroupStreamingOptimizerVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
@@ -133,9 +133,12 @@
         comp.randomizeFileLocalizer();
         comp.compile();
         MROperPlan plan = comp.getMRPlan();
+        String lastInputChunkSize = 
+            pc.getProperties().getProperty(
+                    "last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
         String prop = System.getProperty("pig.exec.nocombiner");
         if (!("true".equals(prop)))  {
-            CombinerOptimizer co = new CombinerOptimizer(plan);
+            CombinerOptimizer co = new CombinerOptimizer(plan, 
lastInputChunkSize);
             co.visit();
         }
         
@@ -148,9 +151,9 @@
         checker.visit();
         
         // optimize joins
-        CoGroupStreamingOptimizerVisitor cgso = new 
MRCompiler.CoGroupStreamingOptimizerVisitor(plan, 
-                pc.getProperties().getProperty("join.biggest.input.chunksize", 
POJoinPackage.DEFAULT_CHUNK_SIZE));
-        cgso.visit();
+        LastInputStreamingOptimizer liso = 
+            new MRCompiler.LastInputStreamingOptimizer(plan, 
lastInputChunkSize);
+        liso.visit();
         
         // figure out the type of the key for the map plan
         // this is needed when the key is null to create

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 Mon Nov 10 17:23:00 2008
@@ -39,6 +39,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TargetedTuple;
@@ -58,6 +59,8 @@
             Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, 
Writable> {
         private final Log log = LogFactory.getLog(getClass());
 
+        private final static Tuple DUMMYTUPLE = null;
+        
         private byte keyType;
         
         //The reduce plan
@@ -118,17 +121,40 @@
             
             pigReporter.setRep(reporter);
             
-            pack.attachInput(key, tupIter);
+            // In the case we optimize, we combine
+            // POPackage and POForeach - so we could get many
+            // tuples out of the getnext() call of POJoinPackage
+            // In this case, we process till we see EOP from 
+            // POJoinPacakage.getNext()
+            if (pack instanceof POJoinPackage)
+            {
+                pack.attachInput(key, tupIter);
+                while (true)
+                {
+                    if (processOnePackageOutput(oc))
+                        break;
+                }
+            }
+            else {
+                // not optimized, so package will
+                // give only one tuple out for the key
+                pack.attachInput(key, tupIter);
+                processOnePackageOutput(oc);
+            }
             
+        }
+        
+        // return: false-more output
+        //         true- end of processing
+        public boolean 
processOnePackageOutput(OutputCollector<PigNullableWritable, Writable> oc) 
throws IOException {
             try {
-                Tuple t=null;
-                Result res = pack.getNext(t);
+                Result res = pack.getNext(DUMMYTUPLE);
                 if(res.returnStatus==POStatus.STATUS_OK){
                     Tuple packRes = (Tuple)res.result;
                     
                     if(cp.isEmpty()){
                         oc.collect(null, packRes);
-                        return;
+                        return false;
                     }
                     
                     cp.attachInput(packRes);
@@ -137,7 +163,7 @@
 
                     PhysicalOperator leaf = leaves.get(0);
                     while(true){
-                        Result redRes = leaf.getNext(t);
+                        Result redRes = leaf.getNext(DUMMYTUPLE);
                         
                         if(redRes.returnStatus==POStatus.STATUS_OK){
                             Tuple tuple = (Tuple)redRes.result;
@@ -157,7 +183,7 @@
                         }
                         
                         if(redRes.returnStatus==POStatus.STATUS_EOP)
-                            return;
+                            break;
                         
                         if(redRes.returnStatus==POStatus.STATUS_NULL)
                             continue;
@@ -171,22 +197,27 @@
                 }
                 
                 if(res.returnStatus==POStatus.STATUS_NULL)
-                    return;
+                    return false;
                 
                 if(res.returnStatus==POStatus.STATUS_ERR){
                     IOException ioe = new IOException("Packaging error while 
processing group");
                     throw ioe;
                 }
+                
+                if(res.returnStatus==POStatus.STATUS_EOP) {
+                    return true;
+                }
                     
+                return false;    
                 
             } catch (ExecException e) {
                 IOException ioe = new IOException(e.getMessage());
                 ioe.initCause(e.getCause());
                 throw ioe;
             }
+
         }
         
-        
         /**
          * Will be called once all the intermediate keys and values are
          * processed. So right place to stop the reporter thread.

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Mon Nov 10 17:23:00 2008
@@ -77,6 +77,7 @@
 public class PigMapReduce {
 
     public static JobConf sJobConf = null;
+    private final static Tuple DUMMYTUPLE = null;
     
     public static class Map extends PigMapBase implements
             Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
@@ -229,8 +230,7 @@
         public boolean 
processOnePackageOutput(OutputCollector<PigNullableWritable, Writable> oc) 
throws IOException
         {
             try {
-                Tuple t=null;
-                Result res = pack.getNext(t);
+                Result res = pack.getNext(DUMMYTUPLE);
                 if(res.returnStatus==POStatus.STATUS_OK){
                     Tuple packRes = (Tuple)res.result;
                     
@@ -277,8 +277,7 @@
         protected void runPipeline(PhysicalOperator leaf) throws 
ExecException, IOException {
             while(true)
             {
-                Tuple dummyTuple = null;  
-                Result redRes = leaf.getNext(dummyTuple);
+                Result redRes = leaf.getNext(DUMMYTUPLE);
                 if(redRes.returnStatus==POStatus.STATUS_OK){
                     outputCollector.collect(null, (Tuple)redRes.result);
                     continue;
@@ -406,8 +405,7 @@
             pack.attachInput(key, tupIter);
             
             try {
-                Tuple t=null;
-                Result res = pack.getNext(t);
+                Result res = pack.getNext(DUMMYTUPLE);
                 if(res.returnStatus==POStatus.STATUS_OK){
                     Tuple packRes = (Tuple)res.result;
                     

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
 Mon Nov 10 17:23:00 2008
@@ -26,6 +26,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage;
@@ -54,7 +55,7 @@
     @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
         
-        // POPackage could be present in the combine plan
+        // POPackage OR POJoinPackage could be present in the combine plan
         // OR in the reduce plan. POPostCombinerPackage could
         // be present only in the reduce plan. Search in these two
         // plans accordingly
@@ -139,6 +140,15 @@
         };
         
         /* (non-Javadoc)
+         * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitJoinPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage)
+         */
+        @Override
+        public void visitJoinPackage(POJoinPackage joinPackage)
+                throws VisitorException {
+            this.pkg = joinPackage;
+        }
+        
+        /* (non-Javadoc)
          * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
          */
         @Override


Reply via email to