Author: pradeepkth
Date: Thu Aug 20 18:08:32 2009
New Revision: 806281

URL: http://svn.apache.org/viewvc?rev=806281&view=rev
Log:
PIG-926: Merge-Join phase 2 (ashutoshc via pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Aug 20 18:08:32 2009
@@ -28,6 +28,8 @@
 
 IMPROVEMENTS
 
+PIG-926: Merge-Join phase 2 (ashutoshc via pradeepkth)
+
 PIG-845: PERFORMANCE: Merge Join (ashutoshc via pradeepkth)
 
 PIG-893:  Added string -> integer, long, float, and double casts (zjffdu via 
gates). 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Thu Aug 20 18:08:32 2009
@@ -34,6 +34,7 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
+import org.apache.pig.SamplableLoader;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
@@ -1065,6 +1066,7 @@
                 
             // We will first operate on right side which is indexer job.
             // First yank plan of the compiled right input and set that as an 
inner plan of right operator.
+            PhysicalPlan rightPipelinePlan;
             if(!rightMROpr.mapDone){
                 PhysicalPlan rightMapPlan = rightMROpr.mapPlan;
                 if(rightMapPlan.getRoots().size() != 1){
@@ -1082,14 +1084,13 @@
                 
                 if (rightMapPlan.getSuccessors(rightLoader) == null || 
rightMapPlan.getSuccessors(rightLoader).isEmpty())
                     // Load - Join case.
-                    joinOp.setupRightPipeline(null); 
+                    rightPipelinePlan = null; 
                 
                 else{ // We got something on right side. Yank it and set it as 
inner plan of right input.
-                    PhysicalPlan rightPipelinePlan = rightMapPlan.clone();
+                    rightPipelinePlan = rightMapPlan.clone();
                     PhysicalOperator root = 
rightPipelinePlan.getRoots().get(0);
                     rightPipelinePlan.disconnect(root, 
rightPipelinePlan.getSuccessors(root).get(0));
                     rightPipelinePlan.remove(root);
-                    joinOp.setupRightPipeline(rightPipelinePlan);
                     rightMapPlan.trimBelow(rightLoader);
                 }
             }
@@ -1097,12 +1098,12 @@
             else if(!rightMROpr.reduceDone){ 
                 // Indexer must run in map. If we are in reduce, close it and 
start new MROper.
                 // No need of yanking in this case. Since we are starting 
brand new MR Operator and it will contain nothing.
-                joinOp.setupRightPipeline(null);
                 POStore rightStore = getStore();
                 FileSpec rightStrFile = getTempFileSpec();
                 rightStore.setSFile(rightStrFile);
                 rightMROpr.setReduceDone(true);
                 rightMROpr = startNew(rightStrFile, rightMROpr);
+                rightPipelinePlan = null; 
             }
             
             else{
@@ -1111,15 +1112,23 @@
                 throw new PlanException(msg, errCode, PigException.BUG);
             }
             
+            joinOp.setupRightPipeline(rightPipelinePlan);
+                        
             // At this point, we must be operating on map plan of right input 
and it would contain nothing else other then a POLoad.
             POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
             
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
 
             // Replace POLoad with  indexer.
-            String[] indexerArgs = new String[2];
-            indexerArgs[0] = rightLoader.getLFile().getFuncName();
+            String[] indexerArgs = new String[3];
+            indexerArgs[0] = rightLoader.getLFile().getFuncSpec().toString();
+             if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0]) 
instanceof SamplableLoader)){
+                 int errCode = 1104;
+                 String errMsg = "Right input of merge-join must implement 
SamplableLoader interface. The specified loader " + indexerArgs[0] + " doesn't 
implement it";
+                 throw new MRCompilerException(errMsg,errCode);
+             }
             List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
-            indexerArgs[1] = 
ObjectSerializer.serialize((Serializable)rightInpPlans); 
+            indexerArgs[1] = 
ObjectSerializer.serialize((Serializable)rightInpPlans);
+            indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
             FileSpec lFile = new 
FileSpec(rightLoader.getLFile().getFileName(),new 
FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
             rightLoader.setLFile(lFile);
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 Thu Aug 20 18:08:32 2009
@@ -32,8 +32,6 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
@@ -49,16 +47,17 @@
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.ObjectSerializer;
 
+/** This operator implements merge join algorithm to do map side joins. 
+ *  Currently, only two-way joins are supported. One input of join is 
identified as left
+ *  and other is identified as right. Left input tuples are the input records 
in map.
+ *  Right tuples are read from HDFS by opening right stream.
+ *  
+ *    This join doesn't support outer join.
+ *    Data is assumed to be sorted in ascending order. It will fail if data is 
sorted in descending order.
+ */
+
 public class POMergeJoin extends PhysicalOperator {
 
-    /** This operator implements merge join algorithm to do map side joins. 
-     *  Currently, only two-way joins are supported. One input of join is 
identified as left
-     *  and other is identified as right. Left input tuples are the input 
records in map.
-     *  Right tuples are read from HDFS by opening right stream.
-     *  
-     *    This join doesn't support outer join.
-     *    Data is assumed to be sorted in ascending order. It will fail if 
data is sorted in descending order.
-     */
     private static final long serialVersionUID = 1L;
 
     private final transient Log log = LogFactory.getLog(getClass());
@@ -119,10 +118,6 @@
 
     private int arrayListSize = 1024;
 
-    private List<POCast> casters;
-
-    private List<POProject> projectors;
-
     /**
      * @param k
      * @param rp
@@ -140,35 +135,8 @@
         mTupleFactory = TupleFactory.getInstance();
         leftTuples = new ArrayList<Tuple>(arrayListSize);
         this.createJoinPlans(inpPlans,keyTypes);
-        setUpTypeCastingForIdxTup(keyTypes.get(0));
     }
 
-    /** This function setups casting for key tuples which we read out of index 
file.
-     * We set the type of key as DataByteArray(DBA) and then cast it into the 
type specified in schema.
-     * If type is not specified in schema, then we will cast from DBA to DBA.
-     */
-    
-    private void setUpTypeCastingForIdxTup(List<Byte> keyTypes){
-         /*   
-         * Cant reuse one POCast operator for all keys since POCast maintains 
some state
-         * and hence its not safe to use one POCast. Thus we use one POCast 
for each key.
-         */
-        casters = new ArrayList<POCast>(keyTypes.size());
-        projectors = new ArrayList<POProject>(keyTypes.size());
-
-        for(Byte keytype : keyTypes){
-            POCast caster = new POCast(genKey());
-            List<PhysicalOperator> pp = new ArrayList<PhysicalOperator>(1);
-            POProject projector = new POProject(genKey());
-            projector.setResultType(DataType.BYTEARRAY);
-            projector.setColumn(0);
-            pp.add(projector);
-            caster.setInputs(pp);
-            caster.setResultType(keytype);
-            projectors.add(projector);
-            casters.add(caster);
-        }
-    }
     /**
      * Configures the Local Rearrange operators to get keys out of tuple.
      * @throws ExecException 
@@ -504,50 +472,15 @@
     private Object extractKeysFromIdxTuple(Tuple idxTuple) throws 
ExecException{
 
         int idxTupSize = idxTuple.size();
-        List<Object> list = new ArrayList<Object>(idxTupSize-2);
-
-        for(int i=0; i<idxTupSize-2; i++){
-
-            
projectors.get(i).attachInput(mTupleFactory.newTuple(idxTuple.get(i)));
 
-            switch (casters.get(i).getResultType()) {
-
-            case DataType.BYTEARRAY:    // POCast doesn't handle DBA. But we 
are saved, because in this case we don't need cast anyway.
-                list.add(idxTuple.get(i));
-                break;
-
-            case DataType.CHARARRAY:
-                list.add(casters.get(i).getNext(dummyString).result);
-                break;
-
-            case DataType.INTEGER:
-                list.add(casters.get(i).getNext(dummyInt).result);
-                break;
-
-            case DataType.FLOAT:
-                list.add(casters.get(i).getNext(dummyFloat).result);
-                break;
-
-            case DataType.DOUBLE:
-                list.add(casters.get(i).getNext(dummyDouble).result);
-                break;
-
-            case DataType.LONG:
-                list.add(casters.get(i).getNext(dummyLong).result);
-                break;
-
-            case DataType.TUPLE:
-                list.add(casters.get(i).getNext(dummyTuple).result);
-                break;
+        if(idxTupSize == 3)
+            return idxTuple.get(0);
+        
+        List<Object> list = new ArrayList<Object>(idxTupSize-2);
+        for(int i=0; i<idxTupSize-2;i++)
+            list.add(idxTuple.get(i));
 
-            default:
-                int errCode = 2036;
-                String errMsg = "Unhandled key type : 
"+casters.get(i).getResultType();
-                throw new ExecException(errMsg,errCode,PigException.BUG);
-            }
-        }
-        // If there is only one key, we don't want to wrap it into Tuple.
-        return list.size() == 1 ? list.get(0) : mTupleFactory.newTuple(list);
+        return mTupleFactory.newTupleNoCopy(list);
     }
 
     private Result getNextRightInp() throws ExecException{
@@ -623,7 +556,11 @@
 
         // bind loader to file pointed by this index Entry.
         int keysCnt = idxEntry.size();
-        rightLoader = new POLoad(genKey(), new 
FileSpec((String)idxEntry.get(keysCnt-2),this.rightLoaderFuncSpec),(Long)idxEntry.get(keysCnt-1),
 false);
+        Long offset = (Long)idxEntry.get(keysCnt-1);
+        if(offset > 0)
+            // Loader will throw away one tuple if we are in the middle of the 
block. We don't want that.
+            offset -= 1 ;
+        rightLoader = new POLoad(genKey(), new 
FileSpec((String)idxEntry.get(keysCnt-2),this.rightLoaderFuncSpec),offset, 
false);
         rightLoader.setPc(pc);
     }
 
@@ -673,8 +610,6 @@
 
     public void setRightLoaderFuncSpec(FuncSpec rightLoaderFuncSpec) {
         this.rightLoaderFuncSpec = rightLoaderFuncSpec;
-        for(POCast caster : casters)
-            caster.setLoadFSpec(rightLoaderFuncSpec);            
     }
 
     public List<PhysicalPlan> getInnerPlansOf(int index) {

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java Thu 
Aug 20 18:08:32 2009
@@ -18,119 +18,216 @@
 package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.SamplableLoader;
+import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.data.DataType;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.util.ObjectSerializer;
 
-public class MergeJoinIndexer extends RandomSampleLoader {
+/** Merge Join indexer is used to generate on the fly index for doing Merge 
Join efficiently.
+ *  It samples first record from every block of right side input. 
+ *  and returns tuple in the following format : 
+ *  (key0, key1,...,fileName, offset)
+ *  These tuples are then sorted before being written out to index file on 
HDFS.
+ */
+
+public class MergeJoinIndexer  implements LoadFunc{
 
-    /** Merge Join indexer is used to generate on the fly index for doing 
Merge Join efficiently.
-     *  It samples first record from every block of right side input (which is 
later opened as side file in merge join)
-     *  and returns tuple in the following format : 
-     *  (key0, key1,...,fileName, offset)
-     *  These tuples are then sorted before being written out to index file on 
HDFS.
-     */
-    
     private boolean firstRec = true;
     private transient TupleFactory mTupleFactory;
     private String fileName;
     private POLocalRearrange lr;
+    private PhysicalPlan precedingPhyPlan;
     private int keysCnt;
-    
+    private PhysicalOperator rightPipelineLeaf;
+    private PhysicalOperator rightPipelineRoot;
+    private Tuple dummyTuple = null;
+    private SamplableLoader loader;
+
     /** @param funcSpec : Loader specification.
-     *  @param serializedPlans : This is serialized version of LR plan. We 
+     *  @param innerPlan : This is serialized version of LR plan. We 
      *  want to keep only keys in our index file and not the whole tuple. So, 
we need LR and thus its plan
      *  to get keys out of the sampled tuple.  
+     * @param serializedPhyPlan Serialized physical plan on right side.
+     * @throws ExecException 
      */
     @SuppressWarnings("unchecked")
-    public MergeJoinIndexer(String funcSpec, String serializedPlans) throws 
ExecException{
-        super(funcSpec,"1");
-
+    public MergeJoinIndexer(String funcSpec, String innerPlan, String 
serializedPhyPlan) throws ExecException{
+        
+        loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
         try {
-            List<PhysicalPlan> innerPlans = 
(List<PhysicalPlan>)ObjectSerializer.deserialize(serializedPlans);
+            List<PhysicalPlan> innerPlans = 
(List<PhysicalPlan>)ObjectSerializer.deserialize(innerPlan);
             lr = new POLocalRearrange(new OperatorKey("MergeJoin 
Indexer",NodeIdGenerator.getGenerator().getNextNodeId("MergeJoin Indexer")));
             lr.setPlans(innerPlans);
             keysCnt = innerPlans.size();
-            mTupleFactory = TupleFactory.getInstance();
-        }
-        catch (PlanException pe) {
-            int errCode = 2071;
-            String msg = "Problem with setting up local rearrange's plans.";
-            throw new ExecException(msg, errCode, PigException.BUG, pe);
+            precedingPhyPlan = 
(PhysicalPlan)ObjectSerializer.deserialize(serializedPhyPlan);
+            if(precedingPhyPlan != null){
+                    if(precedingPhyPlan.getLeaves().size() != 1 || 
precedingPhyPlan.getRoots().size() != 1){
+                        int errCode = 2168;
+                        String errMsg = "Expected physical plan with exactly 
one root and one leaf.";
+                        throw new 
ExecException(errMsg,errCode,PigException.BUG);
+                    }
+                this.rightPipelineLeaf = precedingPhyPlan.getLeaves().get(0);
+                this.rightPipelineRoot = precedingPhyPlan.getRoots().get(0);
+                this.rightPipelineRoot.setInputs(null);                        
    
+            }
         }
         catch (IOException e) {
             int errCode = 2094;
-            String msg = "Unable to deserialize inner plans in Indexer.";
+            String msg = "Unable to deserialize plans in Indexer.";
             throw new ExecException(msg,errCode,e);
         }
+        mTupleFactory = TupleFactory.getInstance();
     }
 
     @Override
     public void bindTo(String fileName, BufferedPositionedInputStream is,long 
offset, long end) throws IOException {
         this.fileName = fileName;
-        super.bindTo(fileName, is, offset, end);
+        loader.bindTo(fileName, is, offset, end);
     }
 
     @Override
     public Tuple getNext() throws IOException {
 
-        if(!firstRec)   // We sample only record per block.
+        if(!firstRec)   // We sample only one record per block.
             return null;
 
+        long curPos;
+        Object key = null;
+        Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);
+        
         while(true){
-            long initialPos = loader.getPosition();
-            Tuple t = loader.getSampledTuple();
-            
-            if(null == t){          // We hit the end of block because all 
keys are null. 
-            
-                Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);
+            curPos = loader.getPosition();
+            Tuple readTuple = loader.getNext();
+
+            if(null == readTuple){    // We hit the end.
+
                 for(int i =0; i < keysCnt; i++)
                     wrapperTuple.set(i, null);
                 wrapperTuple.set(keysCnt, fileName);
-                wrapperTuple.set(keysCnt+1, initialPos);
+                wrapperTuple.set(keysCnt+1, curPos);
                 firstRec = false;
                 return wrapperTuple;
             }
-                
-            Tuple dummyTuple = null;
-            lr.attachInput(t);
-            Object key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
-            if(null == key)     // Tuple with null key. Drop it. Get next.
-                continue;
-            
-            Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);        
-            if(key instanceof Tuple){
-                Tuple tupKey = (Tuple)key;
-                for(int i =0; i < tupKey.size(); i++)
-                    wrapperTuple.set(i, tupKey.get(i));
+
+            if (null == precedingPhyPlan){
+
+                lr.attachInput(readTuple);
+                key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
+                lr.detachInput();
+                if ( null == key) // Tuple with null key. Drop it.
+                    continue;
+                break;      
             }
 
-            else
-                wrapperTuple.set(0, key);
+            // There is a physical plan. 
 
-            lr.detachInput();
-            wrapperTuple.set(keysCnt, fileName);
-            wrapperTuple.set(keysCnt+1, initialPos);
+            rightPipelineRoot.attachInput(readTuple);
+            boolean fetchNewTup;
 
-            firstRec = false;
-            return wrapperTuple;
+            while(true){
+
+                Result res = rightPipelineLeaf.getNext(dummyTuple);
+                switch(res.returnStatus){
+
+                case POStatus.STATUS_OK:
+
+                    lr.attachInput((Tuple)res.result);
+                    key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
+                    lr.detachInput();
+                    if ( null == key) // Tuple with null key. Drop it.
+                        continue;
+                     fetchNewTup = false;
+                    break;
+
+                case POStatus.STATUS_EOP:
+                    fetchNewTup = true;
+                    break;
+
+                default:
+                    int errCode = 2164;
+                    String errMsg = "Expected EOP/OK as return status. Found: 
"+res.returnStatus;
+                    throw new ExecException(errMsg,errCode);
+                }            
+                break;
+            }
+            if (!fetchNewTup)
+                break;
         }
+
+        if(key instanceof Tuple){
+            Tuple tupKey = (Tuple)key;
+            for(int i =0; i < tupKey.size(); i++)
+                wrapperTuple.set(i, tupKey.get(i));
+        }
+
+        else
+            wrapperTuple.set(0, key);
+
+        wrapperTuple.set(keysCnt, fileName);
+        wrapperTuple.set(keysCnt+1, curPos);    
+        firstRec = false;
+        return wrapperTuple;
+    }
+    
+    public Integer bytesToInteger(byte[] b) throws IOException {
+        return loader.bytesToInteger(b);
     }
 
-    private void readObject(ObjectInputStream is) throws IOException, 
ClassNotFoundException, ExecException{
-        is.defaultReadObject();
-        mTupleFactory = TupleFactory.getInstance();
+    public Long bytesToLong(byte[] b) throws IOException {
+        return loader.bytesToLong(b);
+    }
+
+    public Float bytesToFloat(byte[] b) throws IOException {
+        return loader.bytesToFloat(b);
+    }
+
+    public Double bytesToDouble(byte[] b) throws IOException {
+        return loader.bytesToDouble(b);
+    }
+
+    public String bytesToCharArray(byte[] b) throws IOException {
+        return loader.bytesToCharArray(b);
+    }
+
+    public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+        return loader.bytesToMap(b);
+    }
+
+    public Tuple bytesToTuple(byte[] b) throws IOException {
+        return loader.bytesToTuple(b);
+    }
+
+    public DataBag bytesToBag(byte[] b) throws IOException {
+        return loader.bytesToBag(b);
+    }
+
+    public void fieldsToRead(Schema schema) {
+        loader.fieldsToRead(schema);
+    }
+
+    public Schema determineSchema(
+            String fileName,
+            ExecType execType,
+            DataStorage storage) throws IOException {
+        return loader.determineSchema(fileName, execType, storage);
     }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Thu Aug 20 
18:08:32 2009
@@ -407,6 +407,37 @@
     }       
 
     @Test
+    public void testIndexer() throws IOException{
+        Util.createInputFile(cluster, "temp_file1", new String[]{1+""});
+        Util.createInputFile(cluster, "temp_file2", new String[]{2+""});
+        Util.createInputFile(cluster, "temp_file3", new String[]{10+""});
+        pigServer.registerQuery("A = LOAD 'temp_file*' as (a:int);");
+        pigServer.registerQuery("B = LOAD 'temp_file*' as (a:int);");
+        DataBag dbmrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0 using 
\"merge\";");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbmrj.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0;");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Util.deleteFile(cluster, "temp_file1");
+        Util.deleteFile(cluster, "temp_file2");
+        Util.deleteFile(cluster, "temp_file3");
+        Assert.assertEquals(dbmrj.size(),dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbmrj, dbshj));
+    }
+    
+    @Test
     public void testMergeJoinSch1() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");

Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld Thu 
Aug 20 18:08:32 2009
@@ -1,23 +1,23 @@
-MapReduce(-1,PigStorage) - scope-127:
+MapReduce(-1,PigStorage) - scope-125:
 Reduce Plan Empty
-|   Store(file:/tmp:org.apache.pig.builtin.PigStorage) - scope-126
+|   Store(file:/tmp:org.apache.pig.builtin.PigStorage) - scope-124
 |   |
 |   |---MergeJoin[tuple] - scope-121
 |       |
 |       |---Load(file:/tmp/input1:org.apache.pig.builtin.PigStorage) - 
scope-117
 |
-|---MapReduce(-1,PigStorage) - scope-128:
-    |   
Store(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage)
 - scope-135
+|---MapReduce(-1,PigStorage) - scope-126:
+    |   
Store(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage)
 - scope-133
     |   |
-    |   |---POSort[tuple]() - scope-134
+    |   |---POSort[tuple]() - scope-132
     |       |   |
-    |       |   Project[tuple][*] - scope-133
+    |       |   Project[tuple][*] - scope-131
     |       |
-    |       |---Project[tuple][1] - scope-132
+    |       |---Project[tuple][1] - scope-130
     |           |
-    |           |---Package[tuple]{chararray} - scope-131
-    |   Local Rearrange[tuple]{chararray}(false) - scope-130
+    |           |---Package[tuple]{chararray} - scope-129
+    |   Local Rearrange[tuple]{chararray}(false) - scope-128
     |   |   |
-    |   |   Constant(all) - scope-129
+    |   |   Constant(all) - scope-127
     |   |
-    |   
|---Load(file:/tmp/input2:org.apache.pig.impl.builtin.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','kmonaaafhdhcaabdgkgbhggbcohfhegjgmcoebhchcgbhjemgjhdhehiibncbnjjmhgbjnadaaabejaaaehdgjhkgfhihaaaaaaaabhhaeaaaaaaabhdhcaaeogphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccohagmgbgohdcofagihjhdgjgdgbgmfagmgbgoaaaaaaaaaaaaaaabacaaabfkaaangfgogeepggebgmgmejgohahfhehihcaacfgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcfagmgbgoepcpjdmpihcaifknacaaagemaaakgneghcgpgnefgeghgfhdheaacdemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphfhegjgmcpenhfgmhegjengbhadlemaaafgnelgfhjhdheaaapemgkgbhggbcphfhegjgmcpengbhadlemaaahgnemgfgbhggfhdheaabaemgkgbhggbcphfhegjgmcpemgjhdhedlemaaaegnephahdhbaahoaaafemaaaggnfcgpgphehdhbaahoaaagemaaaignfegpefgeghgfhdhbaahoaaaehihahdhcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohfhegjgmcoenhfgmhegjengbhaaaaaaaaaaaaaaaacacaaabemaaaegnengbhahbaahoaaafhihahdhcaabbgkgbh
 
ggbcohfhegjgmcoeigbhdgiengbhaafahnkmbmdbgganbadaaacegaaakgmgpgbgeeggbgdhegphcejaaajhegihcgfhdgigpgmgehihadpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhdhcaacegphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcelgfhjaaaaaaaaaaaaaaabacaaacekaaacgjgeemaaafhdgdgphagfheaabcemgkgbhggbcpgmgbgoghcpfdhehcgjgoghdlhihaaaaaaaaaaaaaaahiheaaafhdgdgphagfhdhcaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdheaaaaaaaaaaaaaaabacaaagfkaaakgphggfhcgmgpgbgegfgefkaabfhahcgpgdgfhdhdgjgoghecgbghepggfehfhagmgfhdfkaabehcgfhdhfgmhefdgjgoghgmgffehfhagmgfecgbghfkaaaehdhegbhcemaaalgcgbghejhegfhcgbhegphcheaabeemgkgbhggbcphfhegjgmcpejhegfhcgbhegphcdlemaaahgdgpgmhfgngohdheaabfemgkgbhggbcphfhegjgmcpebhchcgbhjemgjhdhedlhihcaagcgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhd
 
gjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcoefhihahcgfhdhdgjgpgoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaadgmgpghheaacaemgphcghcpgbhagbgdgigfcpgdgpgngngpgohdcpgmgpghghgjgoghcpemgpghdlhihcaaemgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofagihjhdgjgdgbgmephagfhcgbhegphcaaaaaaaaaaaaaaabacaaakfkaaangjgohahfheebhehegbgdgigfgeejaabehcgfhbhfgfhdhegfgefagbhcgbgmgmgfgmgjhdgnecaaakhcgfhdhfgmhefehjhagfemaaafgjgohahfheheaablemgphcghcpgbhagbgdgigfcphagjghcpgegbhegbcpfehfhagmgfdlemaaaggjgohahfhehdhbaahoaaagemaaangmgjgogfgbghgffehcgbgdgfhcheaachemgphcghcpgbhagbgdgigfcphagjghcphagfgocphfhegjgmcpemgjgogfgbghgffehcgbgdgfhcdlemaaadgmgpghhbaahoaabfemaaahgphfhehahfhehdhbaahoaaagemaaakhagbhcgfgohefagmgbgoheaafaemgphcghcpgbhagbgdgigfcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccphagmgbgohdcpfagihjhdgjgdgbgmfagmgbgodlemaaadhcgfhdheaaeeemgphcghcpgbhagbgdgig
 
fcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccpfcgfhdhfgmhedlhihcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaaegnelgfhjheaacgemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphagmgbgocpephagfhcgbhegphcelgfhjdlhihahbaahoaaapaappppppppdchahahahdhcaaclgphcghcogbhagbgdgigfcogdgpgngngpgohdcogmgpghghgjgoghcogjgnhagmcoemgpghdeekemgpghghgfhccikmpnoicknfncdiacaaabemaaaegogbgngfhbaahoaaaohihaheaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdhehahahdhcaaecgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofcgfhdhfgmheaaaaaaaaaaaaaaabacaaacecaaamhcgfhehfhcgofdhegbhehfhdemaaaghcgfhdhfgmheheaabcemgkgbhggbcpgmgbgoghcpepgcgkgfgdhedlhihaachahbaahoaabpaaaaaaaahahdhbaahoaaaaaaaaaaabhhaeaaaaaaabhdhcaa
 
bbgkgbhggbcogmgbgoghcoejgohegfghgfhcbcockakephibihdiacaaabejaaafhggbgmhfgfhihcaabagkgbhggbcogmgbgoghcoeohfgngcgfhcigkmjfbnaljeoailacaaaahihaaaaaaaaahihihdhbaahoaaaaaaaaaaabhhaeaaaaaaakhbaahoaabnhihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhbaahoaabnhbaahoaaaphihdhbaahoaaaaaaaaaaaahhaeaaaaaaakhihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahiaahi'))
 - scope-118
\ No newline at end of file
+    |   
|---Load(file:/tmp/input2:org.apache.pig.impl.builtin.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','kmonaaafhdhcaabdgkgbhggbcohfhegjgmcoebhchcgbhjemgjhdhehiibncbnjjmhgbjnadaaabejaaaehdgjhkgfhihaaaaaaaabhhaeaaaaaaabhdhcaaeogphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccohagmgbgohdcofagihjhdgjgdgbgmfagmgbgoaaaaaaaaaaaaaaabacaaabfkaaangfgogeepggebgmgmejgohahfhehihcaacfgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcfagmgbgoepcpjdmpihcaifknacaaagemaaakgneghcgpgnefgeghgfhdheaacdemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphfhegjgmcpenhfgmhegjengbhadlemaaafgnelgfhjhdheaaapemgkgbhggbcphfhegjgmcpengbhadlemaaahgnemgfgbhggfhdheaabaemgkgbhggbcphfhegjgmcpemgjhdhedlemaaaegnephahdhbaahoaaafemaaaggnfcgpgphehdhbaahoaaagemaaaignfegpefgeghgfhdhbaahoaaaehihahdhcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohfhegjgmcoenhfgmhegjengbhaaaaaaaaaaaaaaaacacaaabemaaaegnengbhahbaahoaaafhihahdhcaabbgkgbh
 
ggbcohfhegjgmcoeigbhdgiengbhaafahnkmbmdbgganbadaaacegaaakgmgpgbgeeggbgdhegphcejaaajhegihcgfhdgigpgmgehihadpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhdhcaacegphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcelgfhjaaaaaaaaaaaaaaabacaaacekaaacgjgeemaaafhdgdgphagfheaabcemgkgbhggbcpgmgbgoghcpfdhehcgjgoghdlhihaaaaaaaaaaaaaaahiheaaafhdgdgphagfhdhcaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdheaaaaaaaaaaaaaaabacaaagfkaaakgphggfhcgmgpgbgegfgefkaabfhahcgpgdgfhdhdgjgoghecgbghepggfehfhagmgfhdfkaabehcgfhdhfgmhefdgjgoghgmgffehfhagmgfecgbghfkaaaehdhegbhcemaaalgcgbghejhegfhcgbhegphcheaabeemgkgbhggbcphfhegjgmcpejhegfhcgbhegphcdlemaaahgdgpgmhfgngohdheaabfemgkgbhggbcphfhegjgmcpebhchcgbhjemgjhdhedlhihcaagcgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhd
 
gjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcoefhihahcgfhdhdgjgpgoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaadgmgpghheaacaemgphcghcpgbhagbgdgigfcpgdgpgngngpgohdcpgmgpghghgjgoghcpemgpghdlhihcaaemgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofagihjhdgjgdgbgmephagfhcgbhegphcaaaaaaaaaaaaaaabacaaakfkaaangjgohahfheebhehegbgdgigfgeejaabehcgfhbhfgfhdhegfgefagbhcgbgmgmgfgmgjhdgnecaaakhcgfhdhfgmhefehjhagfemaaafgjgohahfheheaablemgphcghcpgbhagbgdgigfcphagjghcpgegbhegbcpfehfhagmgfdlemaaaggjgohahfhehdhbaahoaaagemaaangmgjgogfgbghgffehcgbgdgfhcheaachemgphcghcpgbhagbgdgigfcphagjghcphagfgocphfhegjgmcpemgjgogfgbghgffehcgbgdgfhcdlemaaadgmgpghhbaahoaabfemaaahgphfhehahfhehdhbaahoaaagemaaakhagbhcgfgohefagmgbgoheaafaemgphcghcpgbhagbgdgigfcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccphagmgbgohdcpfagihjhdgjgdgbgmfagmgbgodlemaaadhcgfhdheaaeeemgphcghcpgbhagbgdgig
 
fcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccpfcgfhdhfgmhedlhihcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaaegnelgfhjheaacgemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphagmgbgocpephagfhcgbhegphcelgfhjdlhihahbaahoaaapaappppppppdchahahahdhcaaclgphcghcogbhagbgdgigfcogdgpgngngpgohdcogmgpghghgjgoghcogjgnhagmcoemgpghdeekemgpghghgfhccikmpnoicknfncdiacaaabemaaaegogbgngfhbaahoaaaohihaheaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdhehahahdhcaaecgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofcgfhdhfgmheaaaaaaaaaaaaaaabacaaacecaaamhcgfhehfhcgofdhegbhehfhdemaaaghcgfhdhfgmheheaabcemgkgbhggbcpgmgbgoghcpepgcgkgfgdhedlhihaachahbaahoaabpaaaaaaaahahdhbaahoaaaaaaaaaaabhhaeaaaaaaabhdhcaa
 
bbgkgbhggbcogmgbgoghcoejgohegfghgfhcbcockakephibihdiacaaabejaaafhggbgmhfgfhihcaabagkgbhggbcogmgbgoghcoeohfgngcgfhcigkmjfbnaljeoailacaaaahihaaaaaaaaahihihdhbaahoaaaaaaaaaaabhhaeaaaaaaakhbaahoaabnhihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhbaahoaabnhbaahoaaaphihdhbaahoaaaaaaaaaaaahhaeaaaaaaakhihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahiaahi',''))
 - scope-118
\ No newline at end of file


Reply via email to