Author: pradeepkth
Date: Thu Oct 29 18:00:26 2009
New Revision: 831051

URL: http://svn.apache.org/viewvc?rev=831051&view=rev
Log:
added missing files from previous commit for PIG-953

Added:
    hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java
    hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java
    hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java
    hadoop/pig/trunk/src/org/apache/pig/SortInfo.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
    hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java

Added: hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java?rev=831051&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java Thu Oct 29 
18:00:26 2009
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+
+/**
+ * A storefunc which has an extra commit() method which is called
+ * when all mappers (when the storefunc is part of map) or reducers (when the
+ * storefunc is part of reduce) are finished. Currently this will allow 
storefuncs
+ * to do any cleanup/finalizing activities knowing that all the maps/reducers
+ * have finished - one such use case is for zebra storage to build an index
+ * for sorted files once all writes are done.
+ */
+public interface CommittableStoreFunc extends StoreFunc {
+    /**
+     * This method is called when all mappers (when the storefunc is part of 
+     * map) or reducers (when the storefunc is part of reduce) are finished.
+     * This allows the storeFunc to do any global commit actions - only called
+     * when all mappers/reducers successfully complete.
+     * 
+     * If the StoreFunc needs to get hold of StoreConfig object for the store
+     * it can call {...@link 
MapRedUtil#getStoreConfig(org.apache.hadoop.mapred.JobConf)} where
+     * conf is the Configuration object passed in the commit() call.
+     * 
+     * @param conf Configuration object for the job
+     */
+    public void commit(Configuration conf) throws IOException;
+}

Added: hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java?rev=831051&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java Thu Oct 29 
18:00:26 2009
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This interface is intended for use by LoadFunc implementations
+ * which have an internal index for sorted data and can use the index
+ * to support merge join in pig. Interaction with the index 
+ * is abstracted away by the methods in this interface which the pig
+ * runtime will call in a particular sequence to get the records it
+ * needs to perform the merge based join.
+ * 
+ * The sequence of calls made from the pig runtime are:
+ * 
+ * {...@link IndexableLoadFunc#initialize(Configuration)}
+ * IndexableLoadFunc.bindTo(filename, bufferedPositionedInputStream, 0, 
LONG.MAX_VALUE);
+ * (the bufferedPositionedInputStream is a decorator around the underlying
+ * DFS input stream)
+ * IndexableLoadFunc.seekNear(keys);
+ * A series of IndexableLoadFunc.getNext(); calls to perform the join
+ * IndexableLoadFunc.close(); 
+ * 
+ */
+public interface IndexableLoadFunc extends LoadFunc {
+    
+    /**
+     * This method is called by pig run time to allow the
+     * IndexableLoadFunc to perform any initialization actions
+     * @param conf The job configuration object
+     */
+    public void initialize(Configuration conf) throws IOException;
+
+    /**
+     * This method is called by the pig runtime to indicate
+     * to the LoadFunc to position its underlying input stream
+     * near the keys supplied as the argument. Specifically:
+     * 1) if the keys are present in the input stream, the loadfunc
+     * implementation should position its read position to 
+     * a record where the key(s) is/are the biggest key(s) less than
+     * the key(s) supplied in the argument OR to the record with the
+     * first occurrence of the keys(s) supplied.
+     * 2) if the key(s) are absent in the input stream, the implementation
+     * should position its read position to a record where the key(s)
+     * is/are the biggest key(s) less than the key(s) supplied OR to the
+     * first record where the key(s) is/are the smallest key(s) greater
+     * than the keys(s) supplied. 
+     * The description above holds for descending order data in 
+     * a similar manner with "biggest" and "less than" replaced with
+     * "smallest" and "greater than" and vice versa.
+     *  
+     * @param keys Tuple with join keys (which are a prefix of the sort
+     * keys of the input data). For example if the data is sorted on
+     * columns in position 2,4,5 any of the following Tuples are
+     * valid as an argument value:
+     * (fieldAt(2))
+     * (fieldAt(2), fieldAt(4))
+     * (fieldAt(2), fieldAt(4), fieldAt(5))
+     * 
+     * The following are some invalid cases:
+     * (fieldAt(4))
+     * (fieldAt(2), fieldAt(5))
+     * (fieldAt(4), fieldAt(5))
+     * 
+     * @throws IOException When the loadFunc is unable to position
+     * to the required point in its input stream
+     */
+    public void seekNear(Tuple keys) throws IOException;
+    
+    
+    /**
+     * A method called by the pig runtime to give an opportunity
+     * for implementations to perform cleanup actions like closing
+     * the underlying input stream. This is necessary since while
+     * performing a join the pig run time may determine than no further
+     * join is possible with remaining records and may indicate to the
+     * IndexableLoader to cleanup by calling this method.
+     * 
+     * @throws IOException if the loadfunc is unable to perform
+     * its close actions.
+     */
+    public void close() throws IOException;
+}

Added: hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java?rev=831051&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java Thu Oct 29 18:00:26 
2009
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.Serializable;
+
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * A class representing information about a sort column to pass
+ * in {...@link SortInfo} to storage functions in {...@link StoreConfig}
+ */
+public class SortColInfo implements Serializable {
+    
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    // name of sort column
+    private String colName;
+    
+    // index position (0 based) of sort column
+    private int colIndex;
+    
+    public enum Order { ASCENDING, DESCENDING }
+    
+    private Order sortOrder;
+
+    
+    /**
+     * @param colName sort column name
+     * @param colIndex index position (0 based) of sort column
+     * @param orderingType whether the column is sorted ascending or descending
+     */
+    public SortColInfo(String colName, int colIndex, Order orderingType) {
+        this.colName = colName;
+        this.colIndex = colIndex;
+        this.sortOrder = orderingType;
+    }
+
+    /**
+     * @return the sort column name - could be null or empty string if
+     * column name could not be determined either because of the absence of
+     * a schema or because the schema had the column name as null or empty
+     * string - caller should check for these conditions.
+     */
+    public String getColName() {
+        return colName;
+    }
+
+    /**
+     * @return index position (0 based) of sort column
+     */
+    public int getColIndex() {
+        return colIndex;
+    }
+
+    /**
+     * @return whether the column is sorted ascending or descending
+     */
+    public Order getSortOrder() {
+        return sortOrder;
+    }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        final int prime = 31; 
+        int result = 1;
+        result = prime * result + ((colName == null) ? 0 : colName.hashCode());
+        result = prime * result + colIndex;
+        result = prime * result + ((sortOrder == Order.ASCENDING) ? 1 : 2);
+        return result;
+    }   
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if(!Utils.checkNullAndClass(this, obj)) {
+            return false;
+        }
+        SortColInfo other = (SortColInfo)obj;
+        return Utils.checkNullEquals(this.colName, other.colName, true) &&
+        this.colIndex == other.colIndex && 
+        this.sortOrder == other.sortOrder;
+    }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#toString()
+     */
+    @Override
+    public String toString() {
+        return "colname:" + colName +",colindex:" + colIndex + 
",orderingType:" 
+        + (sortOrder == Order.ASCENDING ? "ascending" : "descending");
+    }
+    
+    
+    
+    
+}

Added: hadoop/pig/trunk/src/org/apache/pig/SortInfo.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/SortInfo.java?rev=831051&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/SortInfo.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/SortInfo.java Thu Oct 29 18:00:26 2009
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * Class to communicate sort column information based on 
+ * order by statment's sort columns and schema
+ */
+public class SortInfo implements Serializable {
+    
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    boolean isGloballySorted = true; // in pig this is the default
+    
+    List<SortColInfo> sortColInfoList;
+    
+    /**
+     * @param sortColInfoList
+     */
+    public SortInfo(List<SortColInfo> sortColInfoList){
+        this.sortColInfoList = sortColInfoList;
+    }
+
+    /**
+     * @return the sortColInfoList
+     */
+    public List<SortColInfo> getSortColInfoList() {
+        return new ArrayList<SortColInfo>(sortColInfoList);
+    }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        final int prime = 31; 
+        int result = 1;
+        result = prime * result + ((sortColInfoList == null) ? 0 : 
+            sortColInfoList.hashCode());
+        result = prime * result + (isGloballySorted ? 1: 0);
+        return result;
+    }   
+
+    /**
+     * @return the isGloballySorted
+     */
+    public boolean isGloballySorted() {
+        return isGloballySorted;
+    }
+
+    
+        /* (non-Javadoc)
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if(!Utils.checkNullAndClass(this, obj)) {
+            return false;
+        }
+        SortInfo other = (SortInfo)obj;
+        return (
+            isGloballySorted == other.isGloballySorted &&
+            Utils.checkNullEquals(sortColInfoList, other.sortColInfoList, 
true));
+    }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#toString()
+     */
+    @Override
+    public String toString() {
+        return "GlobalSort:" + isGloballySorted +", sort column info list:" + 
sortColInfoList;
+    }
+    
+
+
+}

Added: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=831051&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
 Thu Oct 29 18:00:26 2009
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.pig.CommittableStoreFunc;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+/**
+ * A specialization of the default FileOutputCommitter to allow
+ * pig to call commit() on the StoreFunc's associated with the stores
+ * in a job IF the StoreFunc's are CommittableStoreFunc's
+ */
+...@suppresswarnings("deprecation")
+public class PigOutputCommitter extends FileOutputCommitter {
+    
+    /* (non-Javadoc)
+     * @see 
org.apache.hadoop.mapred.FileOutputCommitter#cleanupJob(org.apache.hadoop.mapred.JobContext)
+     */
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        // the following is needed to correctly deserialize udfs in
+        // the map and reduce plans below
+        PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.
+                deserialize(conf.get("udf.import.list")));
+        super.cleanupJob(context);
+        
+        
+        // call commit() on the StoreFunc's associated with the stores
+        // in the job IF the StoreFunc's are CommittableStoreFunc's
+        // look for storeFuncs in the conf - there are two cases
+        // 1) Job with single store - in this case, there would be storefunc
+        // stored in the conf which we can use
+        // 2) Multi store case - in this case, there is no single storefunc
+        // in the conf - instead we would need to look at the
+        // map and reduce plans and get the POStores out of it and then get 
hold
+        // of the respective StoreFuncs
+        String sFuncString = conf.get("pig.storeFunc");
+        PhysicalPlan mp = (PhysicalPlan) ObjectSerializer.deserialize(
+                    conf.get("pig.mapPlan"));
+        List<POStore> mapStores = PlanHelper.getStores(mp);
+        PhysicalPlan rp = (PhysicalPlan) ObjectSerializer.deserialize(
+                    conf.get("pig.reducePlan"));
+        List<POStore> reduceStores = new ArrayList<POStore>();
+        if(rp != null) {
+            reduceStores = PlanHelper.getStores(rp);    
+        }
+        // In single store case, we would have removed the store from the
+        // plan in JobControlCompiler
+        if(sFuncString != null && (mapStores.size() + reduceStores.size() == 
0)) {
+            // single store case
+            StoreFunc sFunc = MapRedUtil.getStoreFunc(new JobConf(conf));
+            commit(sFunc, conf, conf.get(JobControlCompiler.PIG_STORE_CONFIG),
+                    sFuncString);
+        } else {
+            // multi store case
+            commitStores(mapStores, conf);
+            commitStores(reduceStores, conf);
+            
+        }
+    }
+
+    private void commit(StoreFunc sFunc, Configuration conf,
+            StoreConfig storeConfig, String sFuncString) throws IOException {
+        if(sFunc != null && CommittableStoreFunc.class.isAssignableFrom(
+                sFunc.getClass())) {
+            CommittableStoreFunc csFunc = (CommittableStoreFunc)sFunc;
+            // make a copy of the conf since we may be committing multiple
+            // stores and set storeFunc and StoreConfig 
+            // pertaining to this store in the copy and use it
+            Configuration confCopy = new Configuration(conf);
+            confCopy.set("pig.storeFunc", ObjectSerializer.serialize(
+                    sFuncString));
+            confCopy.set(JobControlCompiler.PIG_STORE_CONFIG, 
+                    ObjectSerializer.serialize(storeConfig));
+            
+            csFunc.commit(confCopy);
+        }
+    }
+    
+    private void commit(StoreFunc sFunc, Configuration conf,
+            String storeConfigSerializedString, String sFuncString) throws 
IOException {
+        if(sFunc != null && CommittableStoreFunc.class.isAssignableFrom(
+                sFunc.getClass())) {
+            CommittableStoreFunc csFunc = (CommittableStoreFunc)sFunc;
+            // make a copy of the conf since we may be committing multple
+            // sores and set storeFunc and StoreConfig 
+            // pertaining to this store in the copy and use it
+            Configuration confCopy = new Configuration(conf);
+            confCopy.set("pig.storeFunc", ObjectSerializer.serialize(
+                    sFuncString));
+            confCopy.set(JobControlCompiler.PIG_STORE_CONFIG, 
+                    storeConfigSerializedString);
+            
+            csFunc.commit(confCopy);
+        }
+    }
+    
+    private void commitStores(List<POStore> stores, Configuration conf)
+    throws IOException {
+        for (POStore store : stores) {
+            StoreFunc sFunc = (StoreFunc)PigContext.instantiateFuncFromSpec(
+                    store.getSFile().getFuncSpec());
+            StoreConfig storeConfig = new StoreConfig(store.getSFile().
+                    getFileName(), store.getSchema(), store.getSortInfo());
+            commit(sFunc, conf, storeConfig, 
+                    store.getSFile().getFuncSpec().toString());
+        }
+    }
+}

Added: 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=831051&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java 
(added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java 
Thu Oct 29 18:00:26 2009
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.tools.bzip2r.CBZip2InputStream;
+
+/**
+ *
+ */
+public class DefaultIndexableLoader implements IndexableLoadFunc {
+
+    
+    // FileSpec of index file which will be read from HDFS.
+    private String indexFile;
+    private String indexFileLoadFuncSpec;
+    
+    private LoadFunc loader;
+    // Index is modeled as FIFO queue and LinkedList implements java Queue 
interface.  
+    private LinkedList<Tuple> index;
+    private FuncSpec rightLoaderFuncSpec;
+    private PigContext pc;
+    private String scope;
+    private Tuple dummyTuple = null;
+    private transient TupleFactory mTupleFactory;
+    private InputStream  is;
+    private String currentFileName;
+    
+    public DefaultIndexableLoader(String loaderFuncSpec, String indexFile, 
String indexFileLoadFuncSpec, String scope) {
+        this.rightLoaderFuncSpec = new FuncSpec(loaderFuncSpec);
+        this.indexFile = indexFile;
+        this.indexFileLoadFuncSpec = indexFileLoadFuncSpec;
+        this.scope = scope;
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public void seekNear(Tuple keys) throws IOException{
+        
+        // some setup
+        mTupleFactory = TupleFactory.getInstance();
+
+        /* Currently whole of index is read into memory. Typically, index is 
small. Usually 
+           few KBs in size. So, this should not be an issue.
+           However, reading whole index at startup time is not required. So, 
this can be improved upon.
+           Assumption: Index being read is sorted on keys followed by 
filename, followed by offset.
+         */
+
+        // Index is modeled as FIFO Queue, that frees us from keeping track of 
which index entry should be read next.
+        
+        // the keys are sent in a tuple. If there is really only
+        // 1 join key, it would be the first field of the tuple. If
+        // there are multiple Join keys, the tuple itself represents
+        // the join key
+        Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
+        
+        POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new 
FuncSpec(indexFileLoadFuncSpec)), false);
+        try {
+            pc = 
(PigContext)ObjectSerializer.deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
+        } catch (IOException e) {
+            int errCode = 2094;
+            String msg = "Unable to deserialize pig context.";
+            throw new ExecException(msg,errCode,e);
+        }
+        pc.connect();
+        ld.setPc(pc);
+        index = new LinkedList<Tuple>();
+        for(Result 
res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple))
+            index.offer((Tuple) res.result);   
+
+        Tuple prevIdxEntry = null;
+        Tuple matchedEntry;
+     
+        // When the first call is made, we need to seek into right input at 
correct offset.
+        while(true){
+            // Keep looping till we find first entry in index >= left key
+            // then return the prev idx entry.
+
+            Tuple curIdxEntry = index.poll();
+            if(null == curIdxEntry){
+                // Its possible that we hit end of index and still doesn't 
encounter
+                // idx entry >= left key, in that case return last index entry.
+                matchedEntry = prevIdxEntry;
+                break;
+            }
+            Object extractedKey = extractKeysFromIdxTuple(curIdxEntry);
+            if(extractedKey == null){
+                prevIdxEntry = curIdxEntry;
+                continue;
+            }
+            
+            if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){
+                if(null == prevIdxEntry)   // very first entry in index.
+                    matchedEntry = curIdxEntry;
+                else{
+                    matchedEntry = prevIdxEntry;
+                    index.addFirst(curIdxEntry);  // We need to add back the 
current index Entry because we are reading ahead.
+                }
+                break;
+            }
+            else
+                prevIdxEntry = curIdxEntry;
+        }
+
+        if(matchedEntry == null){
+            
+            int errCode = 2165;
+            String errMsg = "Problem in index construction.";
+            throw new ExecException(errMsg,errCode,PigException.BUG);
+        }
+        
+        Object extractedKey = extractKeysFromIdxTuple(matchedEntry);
+        
+        if(extractedKey != null){
+            Class idxKeyClass = extractedKey.getClass();
+            if( ! firstLeftKey.getClass().equals(idxKeyClass)){
+
+                // This check should indeed be done on compile time. But to be 
on safe side, we do it on runtime also.
+                int errCode = 2166;
+                String errMsg = "Key type mismatch. Found key of type 
"+firstLeftKey.getClass().getCanonicalName()+" on left side. But, found key of 
type "+ idxKeyClass.getCanonicalName()+" in index built for right side.";
+                throw new ExecException(errMsg,errCode,PigException.BUG);
+            }
+        }
+        initRightLoader(matchedEntry);
+    }
+    
+    private void initRightLoader(Tuple idxEntry) throws IOException{
+
+        // bind loader to file pointed by this index Entry.
+        int keysCnt = idxEntry.size();
+        Long offset = (Long)idxEntry.get(keysCnt-1);
+        if(offset > 0)
+            // Loader will throw away one tuple if we are in the middle of the 
block. We don't want that.
+            offset -= 1 ;
+        FileSpec lFile = new 
FileSpec((String)idxEntry.get(keysCnt-2),this.rightLoaderFuncSpec);
+        currentFileName = lFile.getFileName();
+        loader = 
(LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
+        is = FileLocalizer.open(currentFileName, offset, pc);
+        if (currentFileName.endsWith(".bz") || 
currentFileName.endsWith(".bz2")) {
+            is = new CBZip2InputStream((SeekableInputStream)is, 9);
+        } else if (currentFileName.endsWith(".gz")) {
+            is = new GZIPInputStream(is);
+        }
+
+        
+        loader.bindTo(currentFileName , new BufferedPositionedInputStream(is), 
offset, Long.MAX_VALUE);
+    }
+
+    private Object extractKeysFromIdxTuple(Tuple idxTuple) throws 
ExecException{
+
+        int idxTupSize = idxTuple.size();
+
+        if(idxTupSize == 3)
+            return idxTuple.get(0);
+        
+        List<Object> list = new ArrayList<Object>(idxTupSize-2);
+        for(int i=0; i<idxTupSize-2;i++)
+            list.add(idxTuple.get(i));
+
+        return mTupleFactory.newTupleNoCopy(list);
+    }
+
+    private OperatorKey genKey(){
+        return new 
OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope));
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#bindTo(java.lang.String, 
org.apache.pig.impl.io.BufferedPositionedInputStream, long, long)
+     */
+    @Override
+    public void bindTo(String fileName, BufferedPositionedInputStream is,
+            long offset, long end) throws IOException {
+        
+
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#bytesToBag(byte[])
+     */
+    @Override
+    public DataBag bytesToBag(byte[] b) throws IOException {
+        throw new IOException("Method not implemented by design");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#bytesToCharArray(byte[])
+     */
+    @Override
+    public String bytesToCharArray(byte[] b) throws IOException {
+        throw new IOException("Method not implemented by design");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#bytesToDouble(byte[])
+     */
+    @Override
+    public Double bytesToDouble(byte[] b) throws IOException {
+        throw new IOException("Method not implemented by design");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#bytesToFloat(byte[])
+     */
+    @Override
+    public Float bytesToFloat(byte[] b) throws IOException {
+        throw new IOException("Method not implemented by design");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#bytesToInteger(byte[])
+     */
+    @Override
+    public Integer bytesToInteger(byte[] b) throws IOException {
+        throw new IOException("Method not implemented by design");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#bytesToLong(byte[])
+     */
+    @Override
+    public Long bytesToLong(byte[] b) throws IOException {
+        throw new IOException("Method not implemented by design");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#bytesToMap(byte[])
+     */
+    @Override
+    public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+        throw new IOException("Method not implemented by design");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#bytesToTuple(byte[])
+     */
+    @Override
+    public Tuple bytesToTuple(byte[] b) throws IOException {
+        throw new IOException("Method not implemented by design");
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, 
org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+     */
+    @Override
+    public Schema determineSchema(String fileName, ExecType execType,
+            DataStorage storage) throws IOException {
+        throw new IOException("Method not implemented by design");
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema)
+     */
+    @Override
+    public void fieldsToRead(Schema schema) {
+
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#getNext()
+     */
+    @Override
+    public Tuple getNext() throws IOException {
+        Tuple t = loader.getNext();
+        if(t == null) {
+            while(true){                        // But next file may be same 
as previous one, because index may contain multiple entries for same file.
+                Tuple idxEntry = index.poll();
+                if(null == idxEntry) {           // Index is finished too. 
Right stream is finished. No more tuples.
+                    return null;
+                } else {                           
+                    
if(currentFileName.equals((String)idxEntry.get(idxEntry.size()-2))) {
+                        continue;
+                    } else {
+                        initRightLoader(idxEntry);      // bind loader to file 
and get tuple from it.
+                        return loader.getNext();    
+                    }
+                }
+           }
+        }
+        return t;
+    }
+    
+    @Override
+    public void close() throws IOException {
+        is.close();
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.pig.IndexableLoadFunc#initialize(org.apache.hadoop.conf.Configuration)
+     */
+    @Override
+    public void initialize(Configuration conf) throws IOException {
+        // nothing to do
+        
+    }
+    
+}

Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=831051&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Oct 29 
18:00:26 2009
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+/**
+ * Class with utility static methods
+ */
+public class Utils {
+  
+    /**
+     * This method is a helper for classes to implement {...@link 
java.lang.Object#equals(java.lang.Object)}
+     * checks if two objects are equals - two levels of checks are
+     * made - first if both are null or not null. If either is null,
+     * check is made whether both are null.
+     * If both are non null, equality also is checked if so indicated
+     * @param obj1 first object to be compared
+     * @param obj2 second object to be compared
+     * @param checkEquality flag to indicate whether object equality should
+     * be checked if obj1 and obj2 are non-null
+     * @return true if the two objects are equal
+     * false otherwise
+     */
+    public static boolean checkNullEquals(Object obj1, Object obj2, boolean 
checkEquality) {
+        if(obj1 == null || obj2 == null) {
+            return obj1 == obj2;
+        }
+        if(checkEquality) {
+            if(!obj1.equals(obj2)) {
+                return false;
+            }
+        }
+        return true;
+    }
+    
+    
+    /**
+     * This method is a helper for classes to implement {...@link 
java.lang.Object#equals(java.lang.Object)}
+     * The method checks whether the two arguments are both null or both not 
null and 
+     * whether they are of the same class
+     * @param obj1 first object to compare
+     * @param obj2 second object to compare
+     * @return true if both objects are null or both are not null
+     * and if both are of the same class if not null
+     * false otherwise
+     */
+    public static boolean checkNullAndClass(Object obj1, Object obj2) {
+        if(checkNullEquals(obj1, obj2, false)) {
+            if(obj1 != null) {
+                return obj1.getClass() == obj2.getClass();
+            } else {
+                return true; // both obj1 and obj2 should be null
+            }
+        } else {
+            return false;
+        }
+    }
+
+    
+}


Reply via email to