Author: pradeepkth
Date: Mon Nov 23 20:27:30 2009
New Revision: 883486

URL: http://svn.apache.org/viewvc?rev=883486&view=rev
Log:
PIG-1088:change merge join and merge join indexer to work with new LoadFunc 
interface (thejas via pradeepkth)

Added:
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
Removed:
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java
Modified:
    hadoop/pig/branches/load-store-redesign/CHANGES.txt
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java

Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Mon Nov 23 20:27:30 2009
@@ -24,6 +24,9 @@
 
 INCOMPATIBLE CHANGES
 
+PIG-1088: change merge join and merge join indexer to work with new LoadFunc
+interface (thejas via pradeepkth)
+
 PIG-879:  Pig should provide a way for input location string in load statement
 to be passed as-is to the Loader (rding via pradeepkth)
 

Added: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java?rev=883486&view=auto
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java
 (added)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java
 Mon Nov 23 20:27:30 2009
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+
+/**
+ * This class provides an implementation of OrderedLoadFunc interface
+ * which can be optionally re-used by LoadFuncs that use FileInputFormat, by
+ * having this as a super class
+ */
+public abstract class FileInputLoadFunc extends OrderedLoadFunc {
+    
+    @Override
+    public WritableComparable<?> getSplitComparable(PigSplit split)
+    throws IOException{
+        FileSplit fileSplit = null;
+        if(split.getWrappedSplit() instanceof FileSplit){
+            fileSplit = (FileSplit)split.getWrappedSplit();
+        }else{
+            throw new RuntimeException("LoadFunc expected split of type 
FileSplit");
+        }
+        
+        return new FileSplitComparable(
+                fileSplit.getPath().toString(),
+                fileSplit.getStart()
+        );
+    }
+
+}
+
+

Added: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java?rev=883486&view=auto
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java
 (added)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java
 Mon Nov 23 20:27:30 2009
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pig.data.DataReaderWriter;
+
+
+/**
+ * This class can be used to represent a relative position in a file. 
+ * compareTo(other) function of WritaleComparable interface is used to compare
+ * position of different objects of this class.
+ */
+public class FileSplitComparable implements 
WritableComparable<FileSplitComparable>, Serializable{
+
+    private static final long serialVersionUID = 1L;
+    protected String filename;
+    protected Long offset;
+    
+    //need a default constructor to be able to de-serialize using just 
+    // the Writable interface
+    public FileSplitComparable(){}
+    
+    public FileSplitComparable(String fileName, long offset){
+        this.filename = fileName;
+        this.offset = offset;
+    }
+
+
+    @Override
+    public int compareTo(FileSplitComparable other) {
+        int rc = filename.compareTo(other.filename);
+        if (rc == 0)
+            rc = Long.signum(offset - other.offset);
+        return rc;
+    }
+
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        filename = (String) DataReaderWriter.readDatum(in);
+        offset = (Long)DataReaderWriter.readDatum(in);
+    }
+
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        DataReaderWriter.writeDatum(out, filename);
+        DataReaderWriter.writeDatum(out, offset);
+    }
+
+    @Override
+    public String toString(){
+        return "FileName: '" + filename + "' Offset: " + offset; 
+    }
+
+    /* (non-Javadoc)
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result
+                + ((filename == null) ? 0 : filename.hashCode());
+        result = prime * result + ((offset == null) ? 0 : offset.hashCode());
+        return result;
+    }
+
+    /* (non-Javadoc)
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        FileSplitComparable other = (FileSplitComparable) obj;
+        if (filename == null) {
+            if (other.filename != null)
+                return false;
+        } else if (!filename.equals(other.filename))
+            return false;
+        if (offset == null) {
+            if (other.offset != null)
+                return false;
+        } else if (!offset.equals(other.offset))
+            return false;
+        return true;
+    }
+}
\ No newline at end of file

Added: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java?rev=883486&view=auto
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java 
(added)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java 
Mon Nov 23 20:27:30 2009
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+
+/**
+ * Implementing this interface indicates to Pig that a given loader
+ * can be used for MergeJoin. The position as represented by the
+ * WritableComparable object is stored in the index created by
+ *  MergeJoin sampling MR job to get an ordered sequence of splits.
+ * This is necessary when the sort key spans multiple splits.
+ */
+public abstract class OrderedLoadFunc extends LoadFunc {
+
+    /**
+     * The WritableComparable object returned will be used to compare
+     * the position of different splits in an ordered stream
+     * @param split
+     * @return WritableComparable representing the position of the split in 
input
+     * @throws IOException
+     */
+    public abstract WritableComparable<?> getSplitComparable(PigSplit split) 
+    throws IOException;
+
+}

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java 
(original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java 
Mon Nov 23 20:27:30 2009
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig;
-
-import java.io.IOException;
-
-import org.apache.pig.data.Tuple;
-import org.apache.pig.LoadFunc;
-
-/**
- * Implementing this interface indicates to Pig that a given loader can be 
- * used by a sampling loader.  The requirement for this is that the loader
- * can handle a getNext() call without knowing the position in the file.
- * This will not be the case for loaders that handle structured data such
- * as XML where they must start at the beginning of the file in order to 
- * understand their position.  Record oriented loaders such as PigStorage
- * can handle this by seeking to the next record delimiter and starting
- * from that point.  Another requirement is that the loader be able to 
- * skip or seek in its input stream.
- */
-public abstract class SamplableLoader extends LoadFunc {
-    
-    /**
-     * Skip ahead in the input stream.
-     * @param n number of bytes to skip
-     * @return number of bytes actually skipped.  The return semantics are
-     * exactly the same as {...@link java.io.InpuStream#skip(long)}
-     */
-    public abstract long skip(long n) throws IOException;
-    
-    /**
-     * Get the current position in the stream.
-     * @return position in the stream.
-     */
-    public abstract long getPosition() throws IOException;
-    
-    /**
-     * Get the next tuple from the stream starting from the current 
-     * read position.
-     * The loader implementation should not assume that current read position 
-     * in the stream is at the beginning of a record since this method is 
called
-     * for sampling and the current read position in the stream could be 
anywhere
-     * in the stream. 
-     * @return the next tuple from underlying input stream or null if there 
are no more tuples
-     * to be processed.
-     */
-    public abstract Tuple getSampledTuple() throws IOException;
-}

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Mon Nov 23 20:27:30 2009
@@ -34,6 +34,7 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.builtin.BinStorage;
@@ -42,7 +43,6 @@
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
-import org.apache.pig.impl.builtin.MergeJoinIndexer;
 import org.apache.pig.impl.builtin.GetMemNumRows;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
@@ -56,7 +56,6 @@
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -1183,17 +1182,25 @@
                 String[] indexerArgs = new String[3];
                 FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
                 indexerArgs[0] = 
origRightLoaderFileSpec.getFuncSpec().toString();
-
+                if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0]) 
instanceof OrderedLoadFunc)){
+                    int errCode = 1104;
+                    String errMsg = "Right input of merge-join must implement 
" +
+                    "OrderedLoadFunc interface. The specified loader " 
+                    + indexerArgs[0] + " doesn't implement it";
+                    throw new MRCompilerException(errMsg,errCode);
+                }
                 List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
                 indexerArgs[1] = 
ObjectSerializer.serialize((Serializable)rightInpPlans);
                 indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
                 FileSpec lFile = new 
FileSpec(rightLoader.getLFile().getFileName(),new 
FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
                 rightLoader.setLFile(lFile);
     
-                // Loader of mro will return a tuple of form (key1, key2, 
..,filename, offset)
+                // Loader of mro will return a tuple of form - 
+                // (keyFirst1, keyFirst2, .. , position, splitIndex) See 
MergeJoinIndexer
                 // Now set up a POLocalRearrange which has "all" as the key 
and tuple fetched
                 // by loader as the "value" of POLocalRearrange
-                // Sorting of index can possibly be achieved by using Hadoop 
sorting between map and reduce instead of Pig doing sort. If that is so, 
+                // Sorting of index can possibly be achieved by using Hadoop 
sorting 
+                // between map and reduce instead of Pig doing sort. If that 
is so, 
                 // it will simplify lot of the code below.
                 
                 PhysicalPlan lrPP = new PhysicalPlan();
@@ -1259,11 +1266,12 @@
                 rightMROpr.setReduceDone(true);
                 
                 // set up the DefaultIndexableLoader for the join operator
-                String[] defaultIndexableLoaderArgs = new String[4];
+                String[] defaultIndexableLoaderArgs = new String[5];
                 defaultIndexableLoaderArgs[0] = 
origRightLoaderFileSpec.getFuncSpec().toString();
                 defaultIndexableLoaderArgs[1] = strFile.getFileName();
                 defaultIndexableLoaderArgs[2] = 
strFile.getFuncSpec().toString();
                 defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
+                defaultIndexableLoaderArgs[4] = 
origRightLoaderFileSpec.getFileName();
                 joinOp.setRightLoaderFuncSpec((new 
FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
                 
joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());    
                  

Added: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java?rev=883486&view=auto
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
 (added)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
 Mon Nov 23 20:27:30 2009
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.OrderedLoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+/** Merge Join indexer is used to generate on the fly index for doing Merge 
Join efficiently.
+ *  It samples first record from every block of right side input. 
+ *  and returns tuple in the following format : 
+ *  (key0, key1,...,position,splitIndex)
+ *  These tuples are then sorted before being written out to index file on 
HDFS.
+ */
+public class MergeJoinIndexer  extends LoadFunc{
+
+    private boolean firstRec = true;
+    private transient TupleFactory mTupleFactory;
+    private POLocalRearrange lr;
+    private PhysicalPlan precedingPhyPlan;
+    private int keysCnt;
+    private PhysicalOperator rightPipelineLeaf;
+    private PhysicalOperator rightPipelineRoot;
+    private Tuple dummyTuple = null;
+    private OrderedLoadFunc loader;
+    private PigSplit pigSplit = null;
+    
+    /** @param funcSpec : Loader specification.
+     *  @param innerPlan : This is serialized version of LR plan. We 
+     *  want to keep only keys in our index file and not the whole tuple. So, 
we need LR and thus its plan
+     *  to get keys out of the sampled tuple.  
+     * @param serializedPhyPlan Serialized physical plan on right side.
+     * @throws ExecException 
+     */
+    @SuppressWarnings("unchecked")
+    public MergeJoinIndexer(String funcSpec, String innerPlan, String 
serializedPhyPlan) throws ExecException{
+        
+        loader = (OrderedLoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
+        try {
+            List<PhysicalPlan> innerPlans = 
(List<PhysicalPlan>)ObjectSerializer.deserialize(innerPlan);
+            lr = new POLocalRearrange(new OperatorKey("MergeJoin 
Indexer",NodeIdGenerator.getGenerator().getNextNodeId("MergeJoin Indexer")));
+            lr.setPlans(innerPlans);
+            keysCnt = innerPlans.size();
+            precedingPhyPlan = 
(PhysicalPlan)ObjectSerializer.deserialize(serializedPhyPlan);
+            if(precedingPhyPlan != null){
+                    if(precedingPhyPlan.getLeaves().size() != 1 || 
precedingPhyPlan.getRoots().size() != 1){
+                        int errCode = 2168;
+                        String errMsg = "Expected physical plan with exactly 
one root and one leaf.";
+                        throw new 
ExecException(errMsg,errCode,PigException.BUG);
+                    }
+                this.rightPipelineLeaf = precedingPhyPlan.getLeaves().get(0);
+                this.rightPipelineRoot = precedingPhyPlan.getRoots().get(0);
+                this.rightPipelineRoot.setInputs(null);                        
    
+            }
+        }
+        catch (IOException e) {
+            int errCode = 2094;
+            String msg = "Unable to deserialize plans in Indexer.";
+            throw new ExecException(msg,errCode,e);
+        }
+        mTupleFactory = TupleFactory.getInstance();
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        
+        if(!firstRec)   // We sample only one record per block.
+            return null;
+        WritableComparable<?> position = loader.getSplitComparable(pigSplit);
+        Object key = null;
+        Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);
+        
+        while(true){
+            Tuple readTuple = loader.getNext();
+
+            if(null == readTuple){    // We hit the end.
+
+                for(int i =0; i < keysCnt; i++)
+                    wrapperTuple.set(i, null);
+                wrapperTuple.set(keysCnt, position);
+                firstRec = false;
+                return wrapperTuple;
+            }
+
+            if (null == precedingPhyPlan){
+
+                lr.attachInput(readTuple);
+                key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
+                lr.detachInput();
+                if ( null == key) // Tuple with null key. Drop it.
+                    continue;
+                break;      
+            }
+
+            // There is a physical plan. 
+
+            rightPipelineRoot.attachInput(readTuple);
+            boolean fetchNewTup;
+
+            while(true){
+
+                Result res = rightPipelineLeaf.getNext(dummyTuple);
+                switch(res.returnStatus){
+
+                case POStatus.STATUS_OK:
+
+                    lr.attachInput((Tuple)res.result);
+                    key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
+                    lr.detachInput();
+                    if ( null == key) // Tuple with null key. Drop it.
+                        continue;
+                     fetchNewTup = false;
+                    break;
+
+                case POStatus.STATUS_EOP:
+                    fetchNewTup = true;
+                    break;
+
+                default:
+                    int errCode = 2164;
+                    String errMsg = "Expected EOP/OK as return status. Found: 
"+res.returnStatus;
+                    throw new ExecException(errMsg,errCode);
+                }            
+                break;
+            }
+            if (!fetchNewTup)
+                break;
+        }
+
+        if(key instanceof Tuple){
+            Tuple tupKey = (Tuple)key;
+            for(int i =0; i < tupKey.size(); i++)
+                wrapperTuple.set(i, tupKey.get(i));
+        }
+
+        else
+            wrapperTuple.set(0, key);
+
+        wrapperTuple.set(keysCnt, position);
+        wrapperTuple.set(keysCnt+1, pigSplit.getSplitIndex());
+        firstRec = false;
+        return wrapperTuple;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#getInputFormat()
+     */
+    @Override
+    public InputFormat getInputFormat() throws IOException {
+        return loader.getInputFormat();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#getLoadCaster()
+     */
+    @Override
+    public LoadCaster getLoadCaster() throws IOException {
+        return loader.getLoadCaster();
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, 
org.apache.hadoop.mapreduce.InputSplit)
+     */
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit split) throws 
IOException {
+        loader.prepareToRead(reader, split);
+        pigSplit = split;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, 
org.apache.hadoop.mapreduce.Job)
+     */
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        loader.setLocation(location, job);
+    }
+}

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
 Mon Nov 23 20:27:30 2009
@@ -58,8 +58,6 @@
     transient LoadFunc loader = null;
     // The filespec on which the operator is based
     FileSpec lFile;
-    // The stream used to bind to by the loader
-    transient InputStream is;
     // PigContext passed to us by the operator creator
     PigContext pc;
     //Indicates whether the loader setup is done or not
@@ -107,13 +105,7 @@
                 ConfigurationUtil.toConfiguration(pc.getProperties()), 
                 filename,
                 0);
-        
-     // XXX : FIXME need to get this to work with new loadfunc interface - the 
-        // below code is for merge join - hopefully we will no longer need it
-        // and then we can just get rid of it and the rest should be fine.
-        is = (this.offset == 0) ? FileLocalizer.open(filename, pc) : 
FileLocalizer.open(filename, this.offset,pc);
-        
-//        loader.bindTo(filename , new BufferedPositionedInputStream(is), 
this.offset, Long.MAX_VALUE);
+
         
     }
     
@@ -123,7 +115,6 @@
      * @throws IOException
      */
     public void tearDown() throws IOException{
-        is.close();
         setUpDone = false;
     }
     

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
 Mon Nov 23 20:27:30 2009
@@ -36,6 +36,7 @@
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
@@ -54,8 +55,9 @@
 import org.apache.pig.impl.io.BinStorageRecordWriter;
 import org.apache.pig.impl.util.LogUtils;
 
-public class BinStorage extends LoadFunc 
-        implements ReversibleLoadStoreFunc, LoadCaster, StoreFunc {
+public class BinStorage extends FileInputLoadFunc 
+implements ReversibleLoadStoreFunc, LoadCaster, StoreFunc {
+
     
     public static final int RECORD_1 = 0x01;
     public static final int RECORD_2 = 0x02;

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
 Mon Nov 23 20:27:30 2009
@@ -37,6 +37,8 @@
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
@@ -55,8 +57,9 @@
  * delimiter is given as a regular expression. See String.split(delimiter) and
  * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for 
more information.
  */
-public class PigStorage extends LoadFunc
-        implements ReversibleLoadStoreFunc, StoreFunc {
+public class PigStorage 
+extends FileInputLoadFunc 
+implements ReversibleLoadStoreFunc, StoreFunc {
     protected RecordReader in = null;
     protected RecordWriter writer = null;
     protected final Log mLog = LogFactory.getLog(getClass());

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java
 Mon Nov 23 20:27:30 2009
@@ -20,12 +20,14 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
 import org.apache.pig.backend.executionengine.ExecException;
 
 /**
@@ -98,6 +100,30 @@
         return new String(ba, DataReaderWriter.UTF8);
     }
     
+    public static Writable bytesToWritable(DataInput in) throws IOException {
+        String className = (String) readDatum(in);
+        // create the writeable class . It needs to have a default constructor
+        Class<?> objClass = null ;
+        try {
+            objClass = Class.forName(className);
+        } catch (ClassNotFoundException e) {
+            throw new IOException("Could not find class " + className + 
+                    ", while attempting to de-serialize it ", e);
+        }
+        Writable writable = null;
+        try {
+            writable = (Writable) objClass.newInstance();
+        } catch (Exception e) {
+            String msg = "Could create instance of class " + className + 
+            ", while attempting to de-serialize it. (no default constructor 
?)";
+            throw new IOException(msg, e);
+        } 
+        
+        //read the fields of the object from DataInput
+        writable.readFields(in);
+        return writable;
+    }
+    
     public static Object readDatum(DataInput in) throws IOException, 
ExecException {
         // Read the data type
         byte b = in.readByte();
@@ -149,7 +175,10 @@
 
             case DataType.CHARARRAY: 
                 return bytesToCharArray(in);
-            
+                
+            case DataType.GENERIC_WRITABLECOMPARABLE :
+                return bytesToWritable(in);
+                
             case DataType.NULL:
                 return null;
 
@@ -261,6 +290,13 @@
                 }
                 break;
                                      }
+            case DataType.GENERIC_WRITABLECOMPARABLE :
+                out.writeByte(DataType.GENERIC_WRITABLECOMPARABLE);
+                //store the class name, so we know the class to create on read
+                writeDatum(out, val.getClass().getName());
+                Writable writable = (Writable)val;
+                writable.write(out);
+                break;
 
             case DataType.NULL:
                 out.writeByte(DataType.NULL);

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java 
(original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java 
Mon Nov 23 20:27:30 2009
@@ -24,8 +24,8 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.Set;
 
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -62,6 +62,12 @@
     public static final byte MAP       = 100;
     public static final byte TUPLE     = 110;
     public static final byte BAG       = 120;
+    
+    // internal use only; used to store WriteableComparable objects 
+    // for creating ordered index in MergeJoin. Expecting a object that
+    // implements Writable interface and has default constructor
+    public static final byte GENERIC_WRITABLECOMPARABLE = 123; 
+    
     public static final byte INTERNALMAP = 127; // internal use only; for maps 
that are object->object.  Used by FindQuantiles.
     public static final byte ERROR     =  -1;
 
@@ -86,6 +92,7 @@
         else if (o instanceof Double) return DOUBLE;
         else if (o instanceof Boolean) return BOOLEAN;
         else if (o instanceof Byte) return BYTE;
+        else if (o instanceof WritableComparable) return 
GENERIC_WRITABLECOMPARABLE;
         else {return ERROR;}
     }
 
@@ -135,6 +142,7 @@
                }  else {
                    interfaces = ioeInterfaces;
                }
+               boolean matchedWritableComparable = false;
                for (int i = 0; i < interfaces.length; i++) {
                    if 
(interfaces[i].getName().equals("org.apache.pig.data.Tuple")) {
                        return TUPLE;
@@ -142,8 +150,13 @@
                        return BAG;
                    } else if (interfaces[i].getName().equals("java.util.Map")) 
{
                        return MAP;
-                   }
+                   } else if 
(interfaces[i].getName().equals("org.apache.hadoop.io.WritableComparable")) {
+                       // use GENERIC_WRITABLECOMPARABLE type only as last 
resort
+                       matchedWritableComparable = true;
+                    }
                }
+               if(matchedWritableComparable)
+                   return GENERIC_WRITABLECOMPARABLE;
                
                return ERROR;
        }
@@ -154,14 +167,19 @@
     }
     public static byte[] genAllTypes(){
         byte[] types = { DataType.BAG, DataType.BIGCHARARRAY, 
DataType.BOOLEAN, DataType.BYTE, DataType.BYTEARRAY, 
-                DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT, 
DataType.INTEGER, DataType.INTERNALMAP, 
+                DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT, 
+                DataType.GENERIC_WRITABLECOMPARABLE,
+                DataType.INTEGER, DataType.INTERNALMAP, 
                 DataType.LONG, DataType.MAP, DataType.TUPLE};
         return types;
     }
     
     private static String[] genAllTypeNames(){
-        String[] names = { "BAG", "BIGCHARARRAY", "BOOLEAN", "BYTE", 
"BYTEARRAY", "CHARARRAY", "DOUBLE", "FLOAT", "INTEGER",
-                "INTERNALMAP", "LONG", "MAP", "TUPLE" };
+        String[] names = { "BAG", "BIGCHARARRAY", "BOOLEAN", "BYTE", 
"BYTEARRAY", 
+                "CHARARRAY", "DOUBLE", "FLOAT", 
+                "GENERIC_WRITABLECOMPARABLE",
+                "INTEGER","INTERNALMAP",
+                "LONG", "MAP", "TUPLE" };
         return names;
     }
     
@@ -215,6 +233,7 @@
         case INTERNALMAP: return "internalmap";
         case TUPLE:     return "tuple";
         case BAG:       return "bag";
+        case GENERIC_WRITABLECOMPARABLE: return "generic_writablecomparable";
         default: return "Unknown";
         }
     }
@@ -253,7 +272,8 @@
                 (dataType == FLOAT) ||
                 (dataType == DOUBLE) ||
                 (dataType == BOOLEAN) ||
-                (dataType == BYTE));
+                (dataType == BYTE) ||
+                (dataType == GENERIC_WRITABLECOMPARABLE));
     }
 
     /**
@@ -366,6 +386,9 @@
                 }
                       }
 
+            case GENERIC_WRITABLECOMPARABLE:
+                return ((Comparable)o1).compareTo(o2);
+
             case INTERNALMAP:
                 return -1;  // Don't think anyway will want to do this.
                 
@@ -374,6 +397,7 @@
 
             case BAG:
                 return ((DataBag)o1).compareTo((DataBag)o2);
+                
 
             default:
                 throw new RuntimeException("Unkown type " + dt1 +

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
 Mon Nov 23 20:27:30 2009
@@ -18,14 +18,10 @@
 package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.zip.GZIPInputStream;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -34,8 +30,8 @@
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
-import org.apache.pig.backend.datastorage.SeekableInputStream;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -44,17 +40,19 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.tools.bzip2r.CBZip2InputStream;
 
 /**
- *
+ * Used by MergeJoin . Takes an index on sorted data
+ * consisting of sorted tuples of the form
+ * (key1,key2..., position,splitIndex) as input. For key given in 
seekNear(Tuple)
+ * finds the splitIndex that can contain the key and initializes 
ReadToEndLoader
+ * to read from that splitIndex onwards , in the sequence of splits in the 
index
  */
-//XXX FIXME - make this work with new load-store redesign
 public class DefaultIndexableLoader extends IndexableLoadFunc {
 
     
@@ -70,20 +68,26 @@
     private String scope;
     private Tuple dummyTuple = null;
     private transient TupleFactory mTupleFactory;
-    private InputStream  is;
-    private String currentFileName;
+
+    private String inpLocation;
     
-    public DefaultIndexableLoader(String loaderFuncSpec, String indexFile, 
String indexFileLoadFuncSpec, String scope) {
+    public DefaultIndexableLoader(
+            String loaderFuncSpec, 
+            String indexFile,
+            String indexFileLoadFuncSpec,
+            String scope,
+            String inputLocation
+    ) {
         this.rightLoaderFuncSpec = new FuncSpec(loaderFuncSpec);
         this.indexFile = indexFile;
         this.indexFileLoadFuncSpec = indexFileLoadFuncSpec;
         this.scope = scope;
+        this.inpLocation = inputLocation;
     }
     
     @SuppressWarnings("unchecked")
     @Override
     public void seekNear(Tuple keys) throws IOException{
-        
         // some setup
         mTupleFactory = TupleFactory.getInstance();
 
@@ -100,7 +104,6 @@
         // there are multiple Join keys, the tuple itself represents
         // the join key
         Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
-        
         POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new 
FuncSpec(indexFileLoadFuncSpec)), false);
         try {
             pc = 
(PigContext)ObjectSerializer.deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
@@ -115,6 +118,7 @@
         for(Result 
res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple))
             index.offer((Tuple) res.result);   
 
+        
         Tuple prevIdxEntry = null;
         Tuple matchedEntry;
      
@@ -137,11 +141,14 @@
             }
             
             if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){
+                index.addFirst(curIdxEntry);  // We need to add back the 
current index Entry because we are reading ahead.
                 if(null == prevIdxEntry)   // very first entry in index.
                     matchedEntry = curIdxEntry;
                 else{
-                    matchedEntry = prevIdxEntry;
-                    index.addFirst(curIdxEntry);  // We need to add back the 
current index Entry because we are reading ahead.
+                    matchedEntry = prevIdxEntry; 
+                    // start join from previous idx entry, it might have tuples
+                    // with this key                    
+                    index.addFirst(prevIdxEntry);
                 }
                 break;
             }
@@ -168,29 +175,24 @@
                 throw new ExecException(errMsg,errCode,PigException.BUG);
             }
         }
-        initRightLoader(matchedEntry);
-    }
-    
-    private void initRightLoader(Tuple idxEntry) throws IOException{
-
-        // bind loader to file pointed by this index Entry.
-        int keysCnt = idxEntry.size();
-        Long offset = (Long)idxEntry.get(keysCnt-1);
-        if(offset > 0)
-            // Loader will throw away one tuple if we are in the middle of the 
block. We don't want that.
-            offset -= 1 ;
-        FileSpec lFile = new 
FileSpec((String)idxEntry.get(keysCnt-2),this.rightLoaderFuncSpec);
-        currentFileName = lFile.getFileName();
-        loader = 
(LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
-        is = FileLocalizer.open(currentFileName, offset, pc);
-        if (currentFileName.endsWith(".bz") || 
currentFileName.endsWith(".bz2")) {
-            is = new CBZip2InputStream((SeekableInputStream)is, 9);
-        } else if (currentFileName.endsWith(".gz")) {
-            is = new GZIPInputStream(is);
-        }
-
-        
-//        loader.bindTo(currentFileName , new 
BufferedPositionedInputStream(is), offset, Long.MAX_VALUE);
+        //add remaining split indexes to splitsAhead array
+        int [] splitsAhead = new int[index.size()];
+        int splitsAheadIdx = 0;
+        for(Tuple t : index){
+            splitsAhead[splitsAheadIdx++] = (Integer) t.get( t.size()-1 );
+        }
+        
+        initRightLoader(splitsAhead);
+    }
+    
+    private void initRightLoader(int [] splitsToBeRead) throws IOException{
+        //create ReadToEndLoader that will read the given splits in order
+        loader = new ReadToEndLoader(
+                
(LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec),
+                ConfigurationUtil.toConfiguration(pc.getProperties()),
+                inpLocation,
+                splitsToBeRead
+        );
     }
 
     private Object extractKeysFromIdxTuple(Tuple idxTuple) throws 
ExecException{
@@ -200,8 +202,9 @@
         if(idxTupSize == 3)
             return idxTuple.get(0);
         
-        List<Object> list = new ArrayList<Object>(idxTupSize-2);
-        for(int i=0; i<idxTupSize-2;i++)
+        int numColsInKey = (idxTupSize - 2);
+        List<Object> list = new ArrayList<Object>(numColsInKey);
+        for(int i=0; i < numColsInKey; i++)
             list.add(idxTuple.get(i));
 
         return mTupleFactory.newTupleNoCopy(list);
@@ -214,27 +217,11 @@
     @Override
     public Tuple getNext() throws IOException {
         Tuple t = loader.getNext();
-        if(t == null) {
-            while(true){                        // But next file may be same 
as previous one, because index may contain multiple entries for same file.
-                Tuple idxEntry = index.poll();
-                if(null == idxEntry) {           // Index is finished too. 
Right stream is finished. No more tuples.
-                    return null;
-                } else {                           
-                    
if(currentFileName.equals((String)idxEntry.get(idxEntry.size()-2))) {
-                        continue;
-                    } else {
-                        initRightLoader(idxEntry);      // bind loader to file 
and get tuple from it.
-                        return loader.getNext();    
-                    }
-                }
-           }
-        }
         return t;
     }
     
     @Override
     public void close() throws IOException {
-        is.close();
     }
 
     @Override
@@ -244,16 +231,23 @@
     }
 
     @Override
-    public InputFormat getInputFormat() {
-        return null;
+    public InputFormat getInputFormat() throws IOException {
+        throw new UnsupportedOperationException();
     }
     
     @Override
+    public LoadCaster getLoadCaster() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public void prepareToRead(RecordReader reader, PigSplit split) {
+        throw new UnsupportedOperationException();
     }
 
     @Override
-    public void setLocation(String location, Job job) throws IOException {     
   
+    public void setLocation(String location, Job job) throws IOException {
+        throw new UnsupportedOperationException();
     }
     
 }

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java
 Mon Nov 23 20:27:30 2009
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -67,13 +66,18 @@
      * the input location string (typically input file/dir name )
      */
     private String inputLocation;
+      
+    /**
+     * If the splits to be read are not in increasing sequence of integers
+     * this array can be used
+     */
+    private int[] toReadSplits = null;
     
     /**
-     * the index of the split (in {...@link 
InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)})
-     * to start reading from
+     * index into toReadSplits
      */
-    private int startSplitIndex;
-
+    private int toReadSplitsIdx = 0;
+    
     /**
      * the index of the split the loader is currently reading from
      */
@@ -82,60 +86,86 @@
     /**
      * the input splits returned by underlying {...@link 
InputFormat#getSplits(JobContext)}
      */
-    private List<InputSplit> splits;
+    private List<InputSplit> inpSplits = null;
     
     /**
      * underlying RecordReader
      */
-    private RecordReader reader;
+    private RecordReader reader = null;
     
     /**
      * underlying InputFormat
      */
-    private InputFormat inputFormat;
+    private InputFormat inputFormat = null;
     
     /**
      * @param wrappedLoadFunc
      * @param conf
      * @param inputLocation
-     * @param startSplitIndex
+     * @param splitIndex
      * @throws IOException 
      * @throws InterruptedException 
      */
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
-            String inputLocation, int startSplitIndex) throws IOException {
+            String inputLocation, int splitIndex) throws IOException {
         this.wrappedLoadFunc = wrappedLoadFunc;
+        this.inputLocation = inputLocation;
+        this.conf = conf;
+        this.curSplitIndex = splitIndex;
+        init();
+    }
+    
+    /**
+     * This constructor takes an array of split indexes (toReadSplitIdxs) of 
the 
+     * splits to be read.
+     * @param wrappedLoadFunc
+     * @param conf
+     * @param inputLocation
+     * @param toReadSplitIdxs
+     * @throws IOException 
+     * @throws InterruptedException 
+     */
+    public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
+            String inputLocation, int[] toReadSplitIdxs) throws IOException {
+        this.wrappedLoadFunc = wrappedLoadFunc;
+        this.inputLocation = inputLocation;
+        this.toReadSplits = toReadSplitIdxs;
+        this.conf = conf;
+        this.curSplitIndex =
+            toReadSplitIdxs.length > 0 ? toReadSplitIdxs[0] : 
Integer.MAX_VALUE;
+        init();
+    }
+    
+    private void init() throws IOException {
         // make a copy so that if the underlying InputFormat writes to the
         // conf, we don't affect the caller's copy
-        this.conf = new Configuration(conf);
-        this.inputLocation = inputLocation;
-        this.startSplitIndex = startSplitIndex;
-        this.curSplitIndex = startSplitIndex;
-        
+        conf = new Configuration(conf);
         // let's initialize the wrappedLoadFunc 
-        Job job = new Job(this.conf);
-        wrappedLoadFunc.setLocation(this.inputLocation, 
+        Job job = new Job(conf);
+        wrappedLoadFunc.setLocation(inputLocation, 
                 job);
         // The above setLocation call could write to the conf within
         // the job - get a hold of the modified conf
-        this.conf = job.getConfiguration();
+        conf = job.getConfiguration();
         inputFormat = wrappedLoadFunc.getInputFormat();
         try {
-            splits = inputFormat.getSplits(new JobContext(this.conf,
+            inpSplits = inputFormat.getSplits(new JobContext(conf,
                     new JobID()));
         } catch (InterruptedException e) {
             throw new IOException(e);
-        }
+        }        
     }
-    
+
     private boolean initializeReader() throws IOException, 
     InterruptedException {
-        if(curSplitIndex > splits.size() - 1) {
+        if(curSplitIndex > inpSplits.size() - 1) {
             // past the last split, we are done
             return false;
         }
-        
-        InputSplit curSplit = splits.get(curSplitIndex);
+        if(reader != null){
+            reader.close();
+        }
+        InputSplit curSplit = inpSplits.get(curSplitIndex);
         TaskAttemptContext tAContext = new TaskAttemptContext(conf, 
                 new TaskAttemptID());
         reader = inputFormat.createRecordReader(curSplit, tAContext);
@@ -164,8 +194,7 @@
                 }
                 // if loadfunc returned null, we need to read next split
                 // if there is one
-                reader.close();
-                curSplitIndex++;
+                updateCurSplitIndex();
                 return getNextHelper();
             }
         } catch (InterruptedException e) {
@@ -179,22 +208,41 @@
             t = wrappedLoadFunc.getNext();
             if(t == null) {
                 // try next split
-                curSplitIndex++;
+                updateCurSplitIndex();
             } else {
                 return t;
             }
         }
         return null;
     }
+    
+    
+    /**
+     * Updates curSplitIndex , just increment if splitIndexes is null,
+     * else get next split in splitIndexes
+     */
+    private void updateCurSplitIndex() {
+        if(toReadSplits == null){
+            ++curSplitIndex;
+        }else{
+            ++toReadSplitsIdx;
+            if(toReadSplitsIdx >= toReadSplits.length){
+                // finished all the splits in splitIndexes array
+                curSplitIndex = Integer.MAX_VALUE;
+            }else{
+                curSplitIndex = toReadSplits[toReadSplitsIdx];
+            }
+        }
+    }
 
     @Override
-    public InputFormat getInputFormat() {
-        return null;
+    public InputFormat getInputFormat() throws IOException {
+        throw new UnsupportedOperationException();
     }
 
     @Override
-    public LoadCaster getLoadCaster() {
-        return null;
+    public LoadCaster getLoadCaster() throws IOException {
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -206,5 +254,7 @@
     public void setLocation(String location, Job job) throws IOException {
         throw new UnsupportedOperationException();       
     }
+
+
    
 }


Reply via email to