Author: pradeepkth
Date: Fri Aug 14 17:53:23 2009
New Revision: 804309

URL: http://svn.apache.org/viewvc?rev=804309&view=rev
Log:
PIG-845: PERFORMANCE: Merge Join (ashutoshc via pradeepkth)

Added:
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/util/LinkedMultiMap.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/src/org/apache/pig/impl/util/MultiMap.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Aug 14 17:53:23 2009
@@ -25,6 +25,9 @@
 PIG-734:  Changed maps to only take strings as keys (gates).
 
 IMPROVEMENTS
+
+PIG-845: PERFORMANCE: Merge Join (ashutoshc via pradeepkth)
+
 PIG-893:  Added string -> integer, long, float, and double casts (zjffdu via 
gates). 
 
 PIG-833: Added Zebra, new columnar storage mechanism for HDFS (rangadi plus 
many others via gates)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Fri Aug 14 17:53:23 2009
@@ -110,6 +110,8 @@
 
     public static final String LOG_DIR = "_logs";
 
+    public static final String END_OF_INP_IN_MAP = "pig.invoke.close.in.map";
+    
     // A mapping of job to pair of store locations and tmp locations for that 
job
     private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
 
@@ -442,11 +444,11 @@
                 jobConf.setMapperClass(PigMapOnly.Map.class);
                 jobConf.setNumReduceTasks(0);
                 jobConf.set("pig.mapPlan", 
ObjectSerializer.serialize(mro.mapPlan));
-                if(mro.isStreamInMap()) {
+                if(mro.isEndOfAllInputSetInMap()) {
                     // this is used in Map.close() to decide whether the
                     // pipeline needs to be rerun one more time in the close()
-                    // The pipeline is rerun only if there was a stream
-                    jobConf.set("pig.stream.in.map", "true");
+                    // The pipeline is rerun if there either was a stream or 
POMergeJoin
+                    jobConf.set(END_OF_INP_IN_MAP, "true");
                 }
             }
             else{
@@ -470,14 +472,14 @@
                     jobConf.setNumReduceTasks(mro.requestedParallelism);
 
                 jobConf.set("pig.mapPlan", 
ObjectSerializer.serialize(mro.mapPlan));
-                if(mro.isStreamInMap()) {
+                if(mro.isEndOfAllInputSetInMap()) {
                     // this is used in Map.close() to decide whether the
                     // pipeline needs to be rerun one more time in the close()
-                    // The pipeline is rerun only if there was a stream
-                    jobConf.set("pig.stream.in.map", "true");
+                    // The pipeline is rerun only if there was a stream or 
merge-join.
+                    jobConf.set(END_OF_INP_IN_MAP, "true");
                 }
                 jobConf.set("pig.reducePlan", 
ObjectSerializer.serialize(mro.reducePlan));
-                if(mro.isStreamInReduce()) {
+                if(mro.isEndOfAllInputSetInReduce()) {
                     // this is used in Map.close() to decide whether the
                     // pipeline needs to be rerun one more time in the close()
                     // The pipeline is rerun only if there was a stream

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Aug 14 17:53:23 2009
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,6 +38,7 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.MergeJoinIndexer;
 import org.apache.pig.impl.builtin.TupleSize;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
@@ -63,6 +65,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -82,6 +85,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 
@@ -996,6 +1000,239 @@
         }
     }
 
+    /** Since merge-join works on two inputs there are exactly two MROper 
predecessors identified  as left and right.
+     *  Instead of merging two operators, both are used to generate a MR job 
each. First MR oper is run to generate on-the-fly index on right side.
+     *  Second is used to actually do the join. First MR oper is identified as 
rightMROper and second as curMROper.
+
+     *  1) RightMROper: If it is in map phase. It can be preceded only by 
POLoad. If there is anything else
+     *                  in physical plan, that is yanked and set as inner 
plans of joinOp.
+     *                  If it is reduce phase. Close this operator and start 
new MROper.
+     *  2) LeftMROper:  If it is in map phase, add the Join operator in it.
+     *                  If it is in reduce phase. Close it and start new 
MROper.
+     */
+
+    @Override
+    public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
+
+        try{
+            if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2)
+                throw new MRCompilerException("Merge Join must have exactly 
two inputs. Found : "+compiledInputs.length, 1101);
+
+            OperatorKey leftPhyOpKey = 
joinOp.getInputs().get(0).getOperatorKey();
+            OperatorKey rightPhyOpKey = 
joinOp.getInputs().get(1).getOperatorKey();
+
+            // Currently we assume that physical operator succeeding 
POMergeJoin in the physical plan is present in MROperators found in 
compiledInputs[].
+            // This may not always hold. e.g., if there is an order-by before 
merge join.
+            
+            
if(compiledInputs[0].mapPlan.getLeaves().get(0).getOperatorKey().equals(leftPhyOpKey)
 || 
compiledInputs[0].reducePlan.getLeaves().get(0).getOperatorKey().equals(leftPhyOpKey))
+                curMROp = compiledInputs[0];
+            
+            else 
if(compiledInputs[1].mapPlan.getLeaves().get(0).getOperatorKey().equals(leftPhyOpKey)
 || 
compiledInputs[1].reducePlan.getLeaves().get(0).getOperatorKey().equals(leftPhyOpKey))
+                curMROp = compiledInputs[1];
+            
+            else{ // This implies predecessor of left input is not found in 
compiled Inputs.
+                int errCode = 2169;
+                String errMsg = "Physical operator preceding left predicate 
not found in compiled MR jobs.";
+                throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+            }
+             
+            MapReduceOper rightMROpr = null;
+            
if(compiledInputs[1].mapPlan.getLeaves().get(0).getOperatorKey().equals(rightPhyOpKey)
 || 
compiledInputs[1].reducePlan.getLeaves().get(0).getOperatorKey().equals(rightPhyOpKey))
+                rightMROpr = compiledInputs[1];
+            
+            else 
if(compiledInputs[0].mapPlan.getLeaves().get(0).getOperatorKey().equals(rightPhyOpKey)
 || 
compiledInputs[0].reducePlan.getLeaves().get(0).getOperatorKey().equals(rightPhyOpKey))
+                rightMROpr = compiledInputs[0];
+            
+            else{ // This implies predecessor of right input is not found in 
compiled Inputs.
+                int errCode = 2169;
+                String errMsg = "Physical operator preceding right predicate 
not found in compiled MR jobs.";
+                throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+            } 
+            
+            if(curMROp == null || rightMROpr == null){
+                
+                // This implies either of compiledInputs[0] or 
compiledInputs[1] is null.
+                int errCode = 2173;
+                String errMsg = "One of the preceding compiled MR operator is 
null. This is not expected.";
+                throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+            }
+            
+            if(curMROp.equals(rightMROpr)){
+                int errCode = 2170;
+                String errMsg = "Physical operator preceding both left and 
right predicate found to be same. This is not expected.";
+                throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+            }
+                
+            // We will first operate on right side which is indexer job.
+            // First yank plan of the compiled right input and set that as an 
inner plan of right operator.
+            if(!rightMROpr.mapDone){
+                PhysicalPlan rightMapPlan = rightMROpr.mapPlan;
+                if(rightMapPlan.getRoots().size() != 1){
+                    int errCode = 2171;
+                    String errMsg = "Expected one but found more then one root 
physical operator in physical plan.";
+                    throw new 
MRCompilerException(errMsg,errCode,PigException.BUG);
+                }
+                
+                PhysicalOperator rightLoader = rightMapPlan.getRoots().get(0);
+                if(! (rightLoader instanceof POLoad)){
+                    int errCode = 2172;
+                    String errMsg = "Expected physical operator at root to be 
POLoad. Found : "+rightLoader.getClass().getCanonicalName();
+                    throw new MRCompilerException(errMsg,errCode);
+                }
+                
+                if (rightMapPlan.getSuccessors(rightLoader) == null || 
rightMapPlan.getSuccessors(rightLoader).isEmpty())
+                    // Load - Join case.
+                    joinOp.setupRightPipeline(null); 
+                
+                else{ // We got something on right side. Yank it and set it as 
inner plan of right input.
+                    PhysicalPlan rightPipelinePlan = rightMapPlan.clone();
+                    PhysicalOperator root = 
rightPipelinePlan.getRoots().get(0);
+                    rightPipelinePlan.disconnect(root, 
rightPipelinePlan.getSuccessors(root).get(0));
+                    rightPipelinePlan.remove(root);
+                    joinOp.setupRightPipeline(rightPipelinePlan);
+                    rightMapPlan.trimBelow(rightLoader);
+                }
+            }
+            
+            else if(!rightMROpr.reduceDone){ 
+                // Indexer must run in map. If we are in reduce, close it and 
start new MROper.
+                // No need of yanking in this case. Since we are starting 
brand new MR Operator and it will contain nothing.
+                joinOp.setupRightPipeline(null);
+                POStore rightStore = getStore();
+                FileSpec rightStrFile = getTempFileSpec();
+                rightStore.setSFile(rightStrFile);
+                rightMROpr.setReduceDone(true);
+                rightMROpr = startNew(rightStrFile, rightMROpr);
+            }
+            
+            else{
+                int errCode = 2022;
+                String msg = "Both map and reduce phases have been done. This 
is unexpected while compiling.";
+                throw new PlanException(msg, errCode, PigException.BUG);
+            }
+            
+            // At this point, we must be operating on map plan of right input 
and it would contain nothing else other then a POLoad.
+            POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
+            
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
+
+            // Replace POLoad with  indexer.
+            String[] indexerArgs = new String[2];
+            indexerArgs[0] = rightLoader.getLFile().getFuncName();
+            List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
+            indexerArgs[1] = 
ObjectSerializer.serialize((Serializable)rightInpPlans); 
+            FileSpec lFile = new 
FileSpec(rightLoader.getLFile().getFileName(),new 
FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
+            rightLoader.setLFile(lFile);
+
+            // Loader of mro will return a tuple of form (key1, key2, 
..,filename, offset)
+            // Now set up a POLocalRearrange which has "all" as the key and 
tuple fetched
+            // by loader as the "value" of POLocalRearrange
+            // Sorting of index can possibly be achieved by using Hadoop 
sorting between map and reduce instead of Pig doing sort. If that is so, 
+            // it will simplify lot of the code below.
+            
+            PhysicalPlan lrPP = new PhysicalPlan();
+            ConstantExpression ce = new ConstantExpression(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+            ce.setValue("all");
+            ce.setResultType(DataType.CHARARRAY);
+            lrPP.add(ce);
+
+            List<PhysicalPlan> lrInnerPlans = new ArrayList<PhysicalPlan>();
+            lrInnerPlans.add(lrPP);
+
+            POLocalRearrange lr = new POLocalRearrange(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+            lr.setIndex(0);
+            lr.setKeyType(DataType.CHARARRAY);
+            lr.setPlans(lrInnerPlans);
+            lr.setResultType(DataType.TUPLE);
+            rightMROpr.mapPlan.addAsLeaf(lr);
+
+            rightMROpr.setMapDone(true);
+
+            // On the reduce side of this indexing job, there will be a global 
rearrange followed by POSort.
+            // Output of POSort will be index file dumped on the DFS.
+
+            // First add POPackage.
+            POPackage pkg = new POPackage(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+            pkg.setKeyType(DataType.CHARARRAY);
+            pkg.setNumInps(1); 
+            pkg.setInner(new boolean[]{false});
+            rightMROpr.reducePlan.add(pkg);
+
+            // Next project tuples from the bag created by POPackage.
+            POProject topPrj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+            topPrj.setColumn(1);
+            topPrj.setResultType(DataType.TUPLE);
+            topPrj.setOverloaded(true);
+            rightMROpr.reducePlan.add(topPrj);
+            rightMROpr.reducePlan.connect(pkg, topPrj);
+
+            // Now create and add POSort. Sort plan is project *.
+            List<PhysicalPlan> sortPlans = new ArrayList<PhysicalPlan>(1);
+            PhysicalPlan innerSortPlan = new PhysicalPlan();
+            POProject prj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+            prj.setStar(true);
+            prj.setOverloaded(false);
+            prj.setResultType(DataType.TUPLE);
+            innerSortPlan.add(prj);
+            sortPlans.add(innerSortPlan);
+
+            // Currently we assume all columns are in asc order.
+            // Add two because filename and offset are added by Indexer in 
addition to keys.
+            List<Boolean>  mAscCols = new 
ArrayList<Boolean>(rightInpPlans.size()+2);
+            for(int i=0; i< rightInpPlans.size()+2; i++)
+                mAscCols.add(true);
+
+            POSort sortOp = new POSort(new 
OperatorKey(scope,nig.getNextNodeId(scope)),1, null, sortPlans, mAscCols, null);
+            rightMROpr.reducePlan.add(sortOp);
+            rightMROpr.reducePlan.connect(topPrj, sortOp);
+
+            POStore st = getStore();
+            FileSpec strFile = getTempFileSpec();
+            st.setSFile(strFile);
+            rightMROpr.reducePlan.addAsLeaf(st);
+            rightMROpr.setReduceDone(true);
+   
+            joinOp.setIndexFile(strFile);
+            
+            // We are done with right side. Lets work on left now.
+            // Join will be materialized in leftMROper.
+            if(!curMROp.mapDone) // Life is easy 
+                curMROp.mapPlan.addAsLeaf(joinOp);
+            
+            else if(!curMROp.reduceDone){  // This is a map-side join. Close 
this MROper and start afresh.
+                POStore leftStore = getStore();
+                FileSpec leftStrFile = getTempFileSpec();
+                leftStore.setSFile(leftStrFile);
+                curMROp.setReduceDone(true);
+                curMROp = startNew(leftStrFile, curMROp);
+                curMROp.mapPlan.addAsLeaf(joinOp);
+            }
+            
+            else{
+                int errCode = 2022;
+                String msg = "Both map and reduce phases have been done. This 
is unexpected while compiling.";
+                throw new PlanException(msg, errCode, PigException.BUG);
+            }
+
+            // We want to ensure indexing job runs prior to actual join job. 
So, connect them in order.
+            MRPlan.connect(rightMROpr, curMROp);
+        }
+        catch(PlanException e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + 
joinOp.getClass().getCanonicalName();
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+       catch (IOException e){
+           int errCode = 3000;
+           String errMsg = "IOException caught while compiling POMergeJoin";
+            throw new MRCompilerException(errMsg, errCode,e);
+        }
+       catch(CloneNotSupportedException e){
+           int errCode = 2127;
+           String errMsg = "Cloning exception caught while compiling 
POMergeJoin";
+           throw new MRCompilerException(errMsg, errCode, PigException.BUG, e);
+       }
+    }
+
     @Override
     public void visitDistinct(PODistinct op) throws VisitorException {
         try{

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Fri Aug 14 17:53:23 2009
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.LinkedList;
 import java.util.Map;
@@ -43,10 +42,10 @@
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.impl.PigContext;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
@@ -54,7 +53,6 @@
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.CompilationMessageCollector.Message;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.util.ConfigurationValidator;
 import org.apache.pig.impl.util.LogUtils;
@@ -361,7 +359,7 @@
         // check whether stream operator is present
         // after MultiQueryOptimizer because it can shift streams from
         // map to reduce, etc.
-        MRStreamHandler checker = new MRStreamHandler(plan);
+        EndOfAllInputSetter checker = new EndOfAllInputSetter(plan);
         checker.visit();
         
         return plan;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 Fri Aug 14 17:53:23 2009
@@ -68,13 +68,13 @@
     //is complete
     boolean reduceDone = false;
     
-    // Indicates that there is POStream in the 
+    // Indicates that there is an operator which uses endOfAllInput flag in 
the 
     // map plan
-    boolean streamInMap = false;
+    boolean endOfAllInputInMap = false;
     
-    // Indicates that there is POStream in the 
+    // Indicates that there is an operator which uses endOfAllInput flag in 
the 
     // reduce plan
-    boolean streamInReduce = false;
+    boolean endOfAllInputInReduce = false;;
     
     //Indicates if this job is an order by job
     boolean globalSort = false;
@@ -306,33 +306,32 @@
     }
 
     /**
-     * @return whether there is a POStream in the map plan
+     * @return whether end of all input is set in the map plan
      */
-    public boolean isStreamInMap() {
-        return streamInMap;
+    public boolean isEndOfAllInputSetInMap() {
+        return endOfAllInputInMap;
     }
 
     /**
-     * @param streamInMap the streamInMap to set
+     * @param endOfAllInputInMap the streamInMap to set
      */
-    public void setStreamInMap(boolean streamInMap) {
-        this.streamInMap = streamInMap;
+    public void setEndOfAllInputInMap(boolean endOfAllInputInMap) {
+        this.endOfAllInputInMap = endOfAllInputInMap;
     }
 
     /**
-     * @return whether there is a POStream in the reduce plan
+     * @return whether end of all input is set in the reduce plan
      */
-    public boolean isStreamInReduce() {
-        return streamInReduce;
+    public boolean isEndOfAllInputSetInReduce() {
+        return endOfAllInputInReduce;
     }
 
     /**
-     * @param streamInReduce the streamInReduce to set
+     * @param endOfAllInputInReduce the streamInReduce to set
      */
-    public void setStreamInReduce(boolean streamInReduce) {
-        this.streamInReduce = streamInReduce;
-    }
-    
+    public void setEndOfAllInputInReduce(boolean endOfAllInputInReduce) {
+        this.endOfAllInputInReduce = endOfAllInputInReduce;
+    }    
     public int getFragment() {
         return fragment;
     }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Fri Aug 14 17:53:23 2009
@@ -970,12 +970,12 @@
     
     private void mergeMROperProperties(MapReduceOper from, MapReduceOper to) {
 
-        if (from.isStreamInMap()) {
-            to.setStreamInMap(true);
+        if (from.isEndOfAllInputSetInMap()) {
+            to.setEndOfAllInputInMap(true);
         }
 
-        if (from.isStreamInReduce()) {
-            to.setStreamInReduce(true);
+        if (from.isEndOfAllInputSetInReduce()) {
+            to.setEndOfAllInputInReduce(true);
         }
         
         if (from.getRequestedParallelism() > to.getRequestedParallelism()) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
 Fri Aug 14 17:53:23 2009
@@ -252,6 +252,11 @@
     }
     
     @Override
+    public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+        join.setParentPlan(parent);
+    }
+
+    @Override
     public void visitSkewedJoin(POSkewedJoin join) throws VisitorException {
         join.setParentPlan(parent);
     }   

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 Fri Aug 14 17:53:23 2009
@@ -33,7 +33,6 @@
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -45,7 +44,6 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -96,13 +94,13 @@
             return;
         }
             
-        if(PigMapReduce.sJobConf.get("pig.stream.in.map", 
"false").equals("true")) {
-            // If there is a stream in the pipeline we could 
+        if(PigMapReduce.sJobConf.get(JobControlCompiler.END_OF_INP_IN_MAP, 
"false").equals("true")) {
+            // If there is a stream in the pipeline or if this map job belongs 
to merge-join we could 
             // potentially have more to process - so lets
             // set the flag stating that all map input has been sent
             // already and then lets run the pipeline one more time
             // This will result in nothing happening in the case
-            // where there is no stream in the pipeline
+            // where there is no stream or it is not a merge-join in the 
pipeline
             mp.endOfAllInput = true;
             try {
                 runPipeline(leaf);

Added: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=804309&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
 Fri Aug 14 17:53:23 2009
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
+
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This visitor visits the MRPlan and does the following
+ * for each MROper: If the map plan or the reduce plan of the MROper has
+ *  an end of all input flag present in it, this marks in the MROper whether 
the map 
+ * has an end of all input flag set or if the reduce has an end of all input 
flag set.
+ *  
+ */
+public class EndOfAllInputSetter extends MROpPlanVisitor {
+
+    /**
+     * @param plan MR plan to visit
+     */
+    public EndOfAllInputSetter(MROperPlan plan) {
+        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+    }
+
+    @Override
+    public void visitMROp(MapReduceOper mr) throws VisitorException {
+        
+        EndOfAllInputChecker checker = new EndOfAllInputChecker(mr.mapPlan);
+        checker.visit();
+        if(checker.isEndOfAllInputPresent()) {
+            mr.setEndOfAllInputInMap(true);            
+        }
+        
+        checker = new EndOfAllInputChecker(mr.reducePlan);
+        checker.visit();
+        if(checker.isEndOfAllInputPresent()) {
+            mr.setEndOfAllInputInReduce(true);            
+        }      
+        
+    }
+
+    static class EndOfAllInputChecker extends PhyPlanVisitor {
+        
+        private boolean endOfAllInputFlag = false;
+        public EndOfAllInputChecker(PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
+        }
+        
+        /* (non-Javadoc)
+         * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
+         */
+        @Override
+        public void visitStream(POStream stream) throws VisitorException {
+            // stream present
+            endOfAllInputFlag = true;
+        }
+        
+        @Override
+        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+            // merge join present
+            endOfAllInputFlag = true;
+        }
+        /**
+         * @return if end of all input is present
+         */
+        public boolean isEndOfAllInputPresent() {
+            return endOfAllInputFlag;
+        }
+    }
+}
\ No newline at end of file

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
 Fri Aug 14 17:53:23 2009
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
-
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.VisitorException;
-
-/**
- * This visitor visits the MRPlan and does the following
- * for each MROper
- *  - If the map plan or the reduce plan of the MROper has
- *  a POStream in it, this marks in the MROper whether the map 
- * has a POStream or if the reduce has a POStream.
- *  
- */
-public class MRStreamHandler extends MROpPlanVisitor {
-
-    /**
-     * @param plan MR plan to visit
-     */
-    public MRStreamHandler(MROperPlan plan) {
-        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
-    }
-
-    @Override
-    public void visitMROp(MapReduceOper mr) throws VisitorException {
-        
-        StreamChecker checker = new StreamChecker(mr.mapPlan);
-        checker.visit();
-        if(checker.isStreamPresent()) {
-            mr.setStreamInMap(true);            
-        }
-        
-        checker = new StreamChecker(mr.reducePlan);
-        checker.visit();
-        if(checker.isStreamPresent()) {
-            mr.setStreamInReduce(true);            
-        }      
-        
-    }
-
-    class StreamChecker extends PhyPlanVisitor {
-        
-        private boolean streamPresent = false;
-        public StreamChecker(PhysicalPlan plan) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
-        }
-        
-        /* (non-Javadoc)
-         * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
-         */
-        @Override
-        public void visitStream(POStream stream) throws VisitorException {
-            // stream present
-            streamPresent = true;
-        }
-
-        /**
-         * @return if stream is present
-         */
-        public boolean isStreamPresent() {
-            return streamPresent;
-        }
-    }
-}
-

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 Fri Aug 14 17:53:23 2009
@@ -56,6 +56,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 
 
+import org.apache.pig.impl.util.LinkedMultiMap;
 import org.apache.pig.impl.util.MultiMap;
 
 public class LogToPhyTranslationVisitor extends LOVisitor {
@@ -723,20 +724,30 @@
         LogToPhyMap.put(cg, poPackage);
     }
     
-       /**
-     * Create the inner plans used to configure the Partition rearrange 
operators
-     */
        @Override
        protected void visit(LOJoin loj) throws VisitorException {
-        List<LogicalOperator> inputs = loj.getInputs();
-        MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new 
MultiMap<PhysicalOperator, PhysicalPlan>();
 
+           String scope = loj.getOperatorKey().scope;
+           
+           // List of join predicates 
+           List<LogicalOperator> inputs = loj.getInputs();
+           
+           // mapping of inner join physical plans corresponding to inner 
physical operators.
+        MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new 
LinkedMultiMap<PhysicalOperator, PhysicalPlan>();
+        
+        // Outer list corresponds to join predicates. Inner list is inner 
physical plan of each predicate.
+        List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
+        
+        // List of physical operator corresponding to join predicates.
         List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
+        
+        // Outer list corresponds to join predicates and inner list 
corresponds to type of keys for each predicate.
+        List<List<Byte>> keyTypes = new ArrayList<List<Byte>>();
+
         for (LogicalOperator op : inputs) {
                        PhysicalOperator physOp = LogToPhyMap.get(op);
             inp.add(physOp);
             List<LogicalPlan> plans = (List<LogicalPlan>) 
loj.getJoinPlans().get(op);
-            
             List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
             currentPlans.push(currentPlan);
             for (LogicalPlan lp : plans) {
@@ -744,17 +755,23 @@
                 PlanWalker<LogicalOperator, LogicalPlan> childWalker = 
mCurrentWalker.spawnChildWalker(lp);
                 pushWalker(childWalker);
                 mCurrentWalker.walk(this);
-                exprPlans.add((PhysicalPlan) currentPlan);
+                exprPlans.add(currentPlan);
                 popWalker();
             }
             currentPlan = currentPlans.pop();
-                       joinPlans.put(physOp, exprPlans);
+            ppLists.add(exprPlans);
+            joinPlans.put(physOp, exprPlans);
+            
+            // Key could potentially be a tuple. So, we visit all exprPlans to 
get types of members of tuples.
+            List<Byte> tupleKeyMemberTypes = new ArrayList<Byte>();
+            for(PhysicalPlan exprPlan : exprPlans)
+                
tupleKeyMemberTypes.add(exprPlan.getLeaves().get(0).getResultType());
+            keyTypes.add(tupleKeyMemberTypes);
                }
 
                if (loj.getJoinType() == LOJoin.JOINTYPE.SKEWED) {
                        POSkewedJoin skj;
                        try {
-                               String scope = loj.getOperatorKey().scope;
                                skj = new POSkewedJoin(new 
OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(),
                                                                                
        inp);
                                skj.setJoinPlans(joinPlans);
@@ -777,40 +794,11 @@
                                }
                        }
                        LogToPhyMap.put(loj, skj);
-               } else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
-               String scope = loj.getOperatorKey().scope;
-               inputs = loj.getInputs();
-               List<List<PhysicalPlan>> ppLists = new 
ArrayList<List<PhysicalPlan>>();
-               List<Byte> keyTypes = new ArrayList<Byte>();
+               } 
+               
+               else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
                
                int fragment = 0;
-               inp = new ArrayList<PhysicalOperator>();
-               for (LogicalOperator op : inputs) {
-                   inp.add(LogToPhyMap.get(op));
-                   List<LogicalPlan> plans = (List<LogicalPlan>) 
loj.getJoinPlans()
-                           .get(op);
-                   
-                   List<PhysicalPlan> exprPlans = new 
ArrayList<PhysicalPlan>();
-                   currentPlans.push(currentPlan);
-                   for (LogicalPlan lp : plans) {
-                       currentPlan = new PhysicalPlan();
-                       PlanWalker<LogicalOperator, LogicalPlan> childWalker = 
mCurrentWalker
-                               .spawnChildWalker(lp);
-                       pushWalker(childWalker);
-                       mCurrentWalker.walk(this);
-                       exprPlans.add((PhysicalPlan) currentPlan);
-                       popWalker();
-
-                   }
-                   currentPlan = currentPlans.pop();
-                   ppLists.add(exprPlans);
-                   
-                   if (plans.size() > 1) {
-                       keyTypes.add(DataType.TUPLE);
-                   } else {
-                       
keyTypes.add(exprPlans.get(0).getLeaves().get(0).getResultType());
-                   }
-               }
                POFRJoin pfrj;
                try {
                    pfrj = new POFRJoin(new 
OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(),
@@ -832,10 +820,36 @@
                    }
                }
                LogToPhyMap.put(loj, pfrj);
-               } else if(loj.getJoinType() == LOJoin.JOINTYPE.REGULAR) {
-               String scope = loj.getOperatorKey().scope;
-               inputs = loj.getInputs();
-               
+               }
+               
+               else if (loj.getJoinType() == LOJoin.JOINTYPE.MERGE && 
validateMergeJoin(loj)) {
+            
+                   POMergeJoin smj;
+            try {
+                smj = new POMergeJoin(new 
OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(),inp,joinPlans,keyTypes);
+            }
+            catch (Exception e) {
+                int errCode = 2042;
+                String msg = "Merge Join creation failed";
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.BUG, e);
+            }
+
+            smj.setResultType(DataType.TUPLE);
+            currentPlan.add(smj);
+
+            for (LogicalOperator op : inputs) {
+                try {
+                    currentPlan.connect(LogToPhyMap.get(op), smj);
+                } catch (PlanException e) {
+                    int errCode = 2015;
+                    String msg = "Invalid physical operators in the physical 
plan" ;
+                    throw new LogicalToPhysicalTranslatorException(msg, 
errCode, PigException.BUG, e);
+                }
+            }
+            LogToPhyMap.put(loj, smj);
+            return;
+        }
+               else if (loj.getJoinType() == LOJoin.JOINTYPE.REGULAR){
                POGlobalRearrange poGlobal = new POGlobalRearrange(new 
OperatorKey(
                        scope, nodeGen.getNextNodeId(scope)), loj
                        .getRequestedParallelism());
@@ -850,8 +864,6 @@
                
                try {
                    currentPlan.connect(poGlobal, poPackage);
-                   List<Boolean> flattenLst = Arrays.asList(true, true);
-
                    for (LogicalOperator op : inputs) {
                        List<LogicalPlan> plans = (List<LogicalPlan>) 
loj.getJoinPlans()
                                .get(op);
@@ -944,127 +956,39 @@
                    String msg = "Invalid physical operators in the physical 
plan" ;
                    throw new LogicalToPhysicalTranslatorException(msg, 
errCode, PigException.BUG, e1);
                }
-               LogToPhyMap.put(loj, fe);   
+               LogToPhyMap.put(loj, fe);
                }
        }
 
-
-       /**
-     * Add a local rearrange operator to the plan 
-        */
-/*
-       private void addLocalRearrange(LogicalOperator lo, PhysicalPlan &plan, 
List<PhysicalPlan> &exprPlans) throws VisitorException {
-        String scope = lo.getOperatorKey().scope;
-               POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
-                               scope, nodeGen.getNextNodeId(scope)), 
lo.getRequestedParallelism());
-
-               try {
-                       physOp.setPlans(exprPlans);
-               } catch (PlanException pe) {
-                       int errCode = 2071;
-                       String msg = "Problem with setting up local rearrange's 
plans.";
-                       throw new LogicalToPhysicalTranslatorException(msg, 
errCode, PigException.BUG, pe);
-               }
-               try {
-                       physOp.setIndex(count++);
-               } catch (ExecException e1) {
-                       int errCode = 2058;
-                       String msg = "Unable to set index on newly create 
POLocalRearrange.";
-                       throw new VisitorException(msg, errCode, 
PigException.BUG, e1);
-               }
-               if (plans.size() > 1) {
-                       type = DataType.TUPLE;
-                       physOp.setKeyType(type);
-               } else {
-                       type = 
exprPlans.get(0).getLeaves().get(0).getResultType();
-                       physOp.setKeyType(type);
-               }
-               physOp.setResultType(DataType.TUPLE);
-
-               currentPlan.add(physOp);
-
-               try {
-                       currentPlan.connect(LogToPhyMap.get(op), physOp);
-                       currentPlan.connect(physOp, poGlobal);
-               } catch (PlanException e) {
-                       int errCode = 2015;
-                       String msg = "Invalid physical operators in the 
physical plan" ;
-                       throw new LogicalToPhysicalTranslatorException(msg, 
errCode, PigException.BUG, e);
-               }
-       }
-  */  
-    /**
-     * Create the inner plans used to configure the Local Rearrange 
operators(ppLists)
-     * Extract the keytypes and create the POFRJoin operator.
-     */
-/*    @Override
-    protected void visit(LOFRJoin frj) throws VisitorException {
-        String scope = frj.getOperatorKey().scope;
-        List<LogicalOperator> inputs = frj.getInputs();
-        List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
-        List<Byte> keyTypes = new ArrayList<Byte>();
+       private boolean validateMergeJoin(LOJoin loj) throws 
LogicalToPhysicalTranslatorException{
+           
+           List<LogicalOperator> preds = loj.getInputs();
+
+           int errCode = 1101;
+           String errMsg = "Merge Join must have exactly two inputs.";
+           if(preds.size() != 2)
+            throw new LogicalToPhysicalTranslatorException(errMsg+" Found: 
"+preds.size(),errCode);
         
-        int fragment = findFrag(inputs,frj.getFragOp());
-        List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
-        for (LogicalOperator op : inputs) {
-            inp.add(LogToPhyMap.get(op));
-            List<LogicalPlan> plans = (List<LogicalPlan>) frj.getJoinColPlans()
-                    .get(op);
-            
-            List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
-            currentPlans.push(currentPlan);
-            for (LogicalPlan lp : plans) {
-                currentPlan = new PhysicalPlan();
-                PlanWalker<LogicalOperator, LogicalPlan> childWalker = 
mCurrentWalker
-                        .spawnChildWalker(lp);
-                pushWalker(childWalker);
-                mCurrentWalker.walk(this);
-                exprPlans.add((PhysicalPlan) currentPlan);
-                popWalker();
-
-            }
-            currentPlan = currentPlans.pop();
-            ppLists.add(exprPlans);
-            
-            if (plans.size() > 1) {
-                keyTypes.add(DataType.TUPLE);
-            } else {
-                
keyTypes.add(exprPlans.get(0).getLeaves().get(0).getResultType());
-            }
-        }
-        POFRJoin pfrj;
-        try {
-            pfrj = new POFRJoin(new 
OperatorKey(scope,nodeGen.getNextNodeId(scope)),frj.getRequestedParallelism(),
-                                        inp, ppLists, keyTypes, null, 
fragment);
-        } catch (ExecException e1) {
-               int errCode = 2058;
-               String msg = "Unable to set index on newly create 
POLocalRearrange.";
-            throw new VisitorException(msg, errCode, PigException.BUG, e1);
-        }
-        pfrj.setResultType(DataType.TUPLE);
-        currentPlan.add(pfrj);
-        for (LogicalOperator op : inputs) {
-            try {
-                currentPlan.connect(LogToPhyMap.get(op), pfrj);
-            } catch (PlanException e) {
-                int errCode = 2015;
-                String msg = "Invalid physical operators in the physical plan" 
;
-                throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.BUG, e);
-            }
-        }
-        LogToPhyMap.put(frj, pfrj);
-    }
-*/
-    private int findFrag(List<LogicalOperator> inputs, LogicalOperator fragOp) 
{
-        int i=-1;
-        for (LogicalOperator lop : inputs) {
-            if(fragOp.getOperatorKey().equals(lop.getOperatorKey()))
-                return ++i;
-        }
-        return -1;
+           return mergeJoinValidator(preds,loj.getPlan());
+       }
+       
+       private boolean mergeJoinValidator(List<LogicalOperator> 
preds,LogicalPlan lp) throws LogicalToPhysicalTranslatorException{
+           
+           int errCode = 1103;
+           String errMsg = "Merge join only supports Filter, Foreach, filter 
and Load as its predecessor. Found : ";
+           if(preds != null && !preds.isEmpty()){
+               for(LogicalOperator lo : preds){
+                   if (!(lo instanceof LOFilter || lo instanceof LOForEach || 
lo instanceof LOLoad))
+                    throw new LogicalToPhysicalTranslatorException(errMsg, 
errCode);
+                   // All is good at this level. Visit predecessors now.
+                   mergeJoinValidator(lp.getPredecessors(lo),lp);
+               }
+           }
+           // We visited everything and all is good.
+           return true;
     }
-
-    @Override
+       
+       @Override
     public void visit(LOFilter filter) throws VisitorException {
         String scope = filter.getOperatorKey().scope;
         POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 Fri Aug 14 17:53:23 2009
@@ -243,7 +243,10 @@
     public void visitFRJoin(POFRJoin join) throws VisitorException {
         //do nothing
     }
-
+    
+    public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+        //do nothing
+    }
     /**
      * @param stream
      * @throws VisitorException 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 Fri Aug 14 17:53:23 2009
@@ -39,7 +39,6 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -73,7 +72,7 @@
     //Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
     private List<List<PhysicalPlan>> phyPlanLists;
     //The key type for each Local Rearrange operator
-    private List<Byte> keyTypes;
+    private List<List<Byte>> keyTypes;
     //The Local Rearrange operators modeling the join key
     private POLocalRearrange[] LRs;
     //The set of files that represent the replicated inputs
@@ -109,7 +108,7 @@
         this(k,rp,inp,null, null, null, -1);
     }
     
-    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, 
List<List<PhysicalPlan>> ppLists, List<Byte> keyTypes, FileSpec[] replFiles, 
int fragment) throws ExecException{
+    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, 
List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes, FileSpec[] 
replFiles, int fragment) throws ExecException{
         super(k,rp,inp);
         
         phyPlanLists = ppLists;
@@ -148,7 +147,7 @@
             POLocalRearrange lr = new POLocalRearrange(genKey(old));
             lr.setIndex(i);
             lr.setResultType(DataType.TUPLE);
-            lr.setKeyType(keyTypes.get(i));
+            lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE : 
keyTypes.get(i).get(0));
             try {
                 lr.setPlans(ppLst);
             } catch (PlanException pe) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
 Fri Aug 14 17:53:23 2009
@@ -64,6 +64,8 @@
     boolean setUpDone = false;
     // Indicates whether the filespec is splittable
     boolean splittable = true;
+    // default offset.
+    private long offset = 0;
     
     private final Log log = LogFactory.getLog(getClass());
     
@@ -76,6 +78,11 @@
         this(k,-1,lFile, splittable);
     }
     
+    public POLoad(OperatorKey k, FileSpec lFile, long offset, boolean 
splittable){
+        this(k,-1,lFile, splittable);
+        this.offset = offset;
+    }
+    
     public POLoad(OperatorKey k, int rp, FileSpec lFile,boolean splittable) {
         super(k, rp);
         this.lFile = lFile;
@@ -86,7 +93,7 @@
      * Set up the loader by 
      * 1) Instantiating the load func
      * 2) Opening an input stream to the specified file and
-     * 3) Binding to the input stream
+     * 3) Binding to the input stream at the specified offset.
      * @throws IOException
      */
     public void setUp() throws IOException{
@@ -95,7 +102,7 @@
         
         is = FileLocalizer.open(filename, pc);
         
-        loader.bindTo(filename , new BufferedPositionedInputStream(is), 0, 
Long.MAX_VALUE);
+        loader.bindTo(filename , new BufferedPositionedInputStream(is), 
this.offset, Long.MAX_VALUE);
     }
     
     /**


Reply via email to