Author: pradeepkth Date: Thu Oct 29 18:00:26 2009 New Revision: 831051 URL: http://svn.apache.org/viewvc?rev=831051&view=rev Log: added missing files from previous commit for PIG-953
Added: hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java hadoop/pig/trunk/src/org/apache/pig/SortInfo.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java Added: hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java?rev=831051&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java Thu Oct 29 18:00:26 2009 @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; + +/** + * A storefunc which has an extra commit() method which is called + * when all mappers (when the storefunc is part of map) or reducers (when the + * storefunc is part of reduce) are finished. Currently this will allow storefuncs + * to do any cleanup/finalizing activities knowing that all the maps/reducers + * have finished - one such use case is for zebra storage to build an index + * for sorted files once all writes are done. + */ +public interface CommittableStoreFunc extends StoreFunc { + /** + * This method is called when all mappers (when the storefunc is part of + * map) or reducers (when the storefunc is part of reduce) are finished. + * This allows the storeFunc to do any global commit actions - only called + * when all mappers/reducers successfully complete. + * + * If the StoreFunc needs to get hold of StoreConfig object for the store + * it can call {...@link MapRedUtil#getStoreConfig(org.apache.hadoop.mapred.JobConf)} where + * conf is the Configuration object passed in the commit() call. + * + * @param conf Configuration object for the job + */ + public void commit(Configuration conf) throws IOException; +} Added: hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java?rev=831051&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java Thu Oct 29 18:00:26 2009 @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.data.Tuple; + +/** + * This interface is intended for use by LoadFunc implementations + * which have an internal index for sorted data and can use the index + * to support merge join in pig. Interaction with the index + * is abstracted away by the methods in this interface which the pig + * runtime will call in a particular sequence to get the records it + * needs to perform the merge based join. + * + * The sequence of calls made from the pig runtime are: + * + * {...@link IndexableLoadFunc#initialize(Configuration)} + * IndexableLoadFunc.bindTo(filename, bufferedPositionedInputStream, 0, LONG.MAX_VALUE); + * (the bufferedPositionedInputStream is a decorator around the underlying + * DFS input stream) + * IndexableLoadFunc.seekNear(keys); + * A series of IndexableLoadFunc.getNext(); calls to perform the join + * IndexableLoadFunc.close(); + * + */ +public interface IndexableLoadFunc extends LoadFunc { + + /** + * This method is called by pig run time to allow the + * IndexableLoadFunc to perform any initialization actions + * @param conf The job configuration object + */ + public void initialize(Configuration conf) throws IOException; + + /** + * This method is called by the pig runtime to indicate + * to the LoadFunc to position its underlying input stream + * near the keys supplied as the argument. Specifically: + * 1) if the keys are present in the input stream, the loadfunc + * implementation should position its read position to + * a record where the key(s) is/are the biggest key(s) less than + * the key(s) supplied in the argument OR to the record with the + * first occurrence of the keys(s) supplied. + * 2) if the key(s) are absent in the input stream, the implementation + * should position its read position to a record where the key(s) + * is/are the biggest key(s) less than the key(s) supplied OR to the + * first record where the key(s) is/are the smallest key(s) greater + * than the keys(s) supplied. + * The description above holds for descending order data in + * a similar manner with "biggest" and "less than" replaced with + * "smallest" and "greater than" and vice versa. + * + * @param keys Tuple with join keys (which are a prefix of the sort + * keys of the input data). For example if the data is sorted on + * columns in position 2,4,5 any of the following Tuples are + * valid as an argument value: + * (fieldAt(2)) + * (fieldAt(2), fieldAt(4)) + * (fieldAt(2), fieldAt(4), fieldAt(5)) + * + * The following are some invalid cases: + * (fieldAt(4)) + * (fieldAt(2), fieldAt(5)) + * (fieldAt(4), fieldAt(5)) + * + * @throws IOException When the loadFunc is unable to position + * to the required point in its input stream + */ + public void seekNear(Tuple keys) throws IOException; + + + /** + * A method called by the pig runtime to give an opportunity + * for implementations to perform cleanup actions like closing + * the underlying input stream. This is necessary since while + * performing a join the pig run time may determine than no further + * join is possible with remaining records and may indicate to the + * IndexableLoader to cleanup by calling this method. + * + * @throws IOException if the loadfunc is unable to perform + * its close actions. + */ + public void close() throws IOException; +} Added: hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java?rev=831051&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java Thu Oct 29 18:00:26 2009 @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig; + +import java.io.Serializable; + +import org.apache.pig.impl.util.Utils; + +/** + * A class representing information about a sort column to pass + * in {...@link SortInfo} to storage functions in {...@link StoreConfig} + */ +public class SortColInfo implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + // name of sort column + private String colName; + + // index position (0 based) of sort column + private int colIndex; + + public enum Order { ASCENDING, DESCENDING } + + private Order sortOrder; + + + /** + * @param colName sort column name + * @param colIndex index position (0 based) of sort column + * @param orderingType whether the column is sorted ascending or descending + */ + public SortColInfo(String colName, int colIndex, Order orderingType) { + this.colName = colName; + this.colIndex = colIndex; + this.sortOrder = orderingType; + } + + /** + * @return the sort column name - could be null or empty string if + * column name could not be determined either because of the absence of + * a schema or because the schema had the column name as null or empty + * string - caller should check for these conditions. + */ + public String getColName() { + return colName; + } + + /** + * @return index position (0 based) of sort column + */ + public int getColIndex() { + return colIndex; + } + + /** + * @return whether the column is sorted ascending or descending + */ + public Order getSortOrder() { + return sortOrder; + } + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((colName == null) ? 0 : colName.hashCode()); + result = prime * result + colIndex; + result = prime * result + ((sortOrder == Order.ASCENDING) ? 1 : 2); + return result; + } + + /* (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if(!Utils.checkNullAndClass(this, obj)) { + return false; + } + SortColInfo other = (SortColInfo)obj; + return Utils.checkNullEquals(this.colName, other.colName, true) && + this.colIndex == other.colIndex && + this.sortOrder == other.sortOrder; + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "colname:" + colName +",colindex:" + colIndex + ",orderingType:" + + (sortOrder == Order.ASCENDING ? "ascending" : "descending"); + } + + + + +} Added: hadoop/pig/trunk/src/org/apache/pig/SortInfo.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/SortInfo.java?rev=831051&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/SortInfo.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/SortInfo.java Thu Oct 29 18:00:26 2009 @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.apache.pig.impl.util.Utils; + +/** + * Class to communicate sort column information based on + * order by statment's sort columns and schema + */ +public class SortInfo implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + boolean isGloballySorted = true; // in pig this is the default + + List<SortColInfo> sortColInfoList; + + /** + * @param sortColInfoList + */ + public SortInfo(List<SortColInfo> sortColInfoList){ + this.sortColInfoList = sortColInfoList; + } + + /** + * @return the sortColInfoList + */ + public List<SortColInfo> getSortColInfoList() { + return new ArrayList<SortColInfo>(sortColInfoList); + } + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((sortColInfoList == null) ? 0 : + sortColInfoList.hashCode()); + result = prime * result + (isGloballySorted ? 1: 0); + return result; + } + + /** + * @return the isGloballySorted + */ + public boolean isGloballySorted() { + return isGloballySorted; + } + + + /* (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if(!Utils.checkNullAndClass(this, obj)) { + return false; + } + SortInfo other = (SortInfo)obj; + return ( + isGloballySorted == other.isGloballySorted && + Utils.checkNullEquals(sortColInfoList, other.sortColInfoList, true)); + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "GlobalSort:" + isGloballySorted +", sort column info list:" + sortColInfoList; + } + + + +} Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=831051&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Thu Oct 29 18:00:26 2009 @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.FileOutputCommitter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.pig.CommittableStoreFunc; +import org.apache.pig.StoreConfig; +import org.apache.pig.StoreFunc; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.util.ObjectSerializer; + +/** + * A specialization of the default FileOutputCommitter to allow + * pig to call commit() on the StoreFunc's associated with the stores + * in a job IF the StoreFunc's are CommittableStoreFunc's + */ +...@suppresswarnings("deprecation") +public class PigOutputCommitter extends FileOutputCommitter { + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.FileOutputCommitter#cleanupJob(org.apache.hadoop.mapred.JobContext) + */ + @SuppressWarnings({ "unchecked", "deprecation" }) + @Override + public void cleanupJob(JobContext context) throws IOException { + Configuration conf = context.getConfiguration(); + // the following is needed to correctly deserialize udfs in + // the map and reduce plans below + PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer. + deserialize(conf.get("udf.import.list"))); + super.cleanupJob(context); + + + // call commit() on the StoreFunc's associated with the stores + // in the job IF the StoreFunc's are CommittableStoreFunc's + // look for storeFuncs in the conf - there are two cases + // 1) Job with single store - in this case, there would be storefunc + // stored in the conf which we can use + // 2) Multi store case - in this case, there is no single storefunc + // in the conf - instead we would need to look at the + // map and reduce plans and get the POStores out of it and then get hold + // of the respective StoreFuncs + String sFuncString = conf.get("pig.storeFunc"); + PhysicalPlan mp = (PhysicalPlan) ObjectSerializer.deserialize( + conf.get("pig.mapPlan")); + List<POStore> mapStores = PlanHelper.getStores(mp); + PhysicalPlan rp = (PhysicalPlan) ObjectSerializer.deserialize( + conf.get("pig.reducePlan")); + List<POStore> reduceStores = new ArrayList<POStore>(); + if(rp != null) { + reduceStores = PlanHelper.getStores(rp); + } + // In single store case, we would have removed the store from the + // plan in JobControlCompiler + if(sFuncString != null && (mapStores.size() + reduceStores.size() == 0)) { + // single store case + StoreFunc sFunc = MapRedUtil.getStoreFunc(new JobConf(conf)); + commit(sFunc, conf, conf.get(JobControlCompiler.PIG_STORE_CONFIG), + sFuncString); + } else { + // multi store case + commitStores(mapStores, conf); + commitStores(reduceStores, conf); + + } + } + + private void commit(StoreFunc sFunc, Configuration conf, + StoreConfig storeConfig, String sFuncString) throws IOException { + if(sFunc != null && CommittableStoreFunc.class.isAssignableFrom( + sFunc.getClass())) { + CommittableStoreFunc csFunc = (CommittableStoreFunc)sFunc; + // make a copy of the conf since we may be committing multiple + // stores and set storeFunc and StoreConfig + // pertaining to this store in the copy and use it + Configuration confCopy = new Configuration(conf); + confCopy.set("pig.storeFunc", ObjectSerializer.serialize( + sFuncString)); + confCopy.set(JobControlCompiler.PIG_STORE_CONFIG, + ObjectSerializer.serialize(storeConfig)); + + csFunc.commit(confCopy); + } + } + + private void commit(StoreFunc sFunc, Configuration conf, + String storeConfigSerializedString, String sFuncString) throws IOException { + if(sFunc != null && CommittableStoreFunc.class.isAssignableFrom( + sFunc.getClass())) { + CommittableStoreFunc csFunc = (CommittableStoreFunc)sFunc; + // make a copy of the conf since we may be committing multple + // sores and set storeFunc and StoreConfig + // pertaining to this store in the copy and use it + Configuration confCopy = new Configuration(conf); + confCopy.set("pig.storeFunc", ObjectSerializer.serialize( + sFuncString)); + confCopy.set(JobControlCompiler.PIG_STORE_CONFIG, + storeConfigSerializedString); + + csFunc.commit(confCopy); + } + } + + private void commitStores(List<POStore> stores, Configuration conf) + throws IOException { + for (POStore store : stores) { + StoreFunc sFunc = (StoreFunc)PigContext.instantiateFuncFromSpec( + store.getSFile().getFuncSpec()); + StoreConfig storeConfig = new StoreConfig(store.getSFile(). + getFileName(), store.getSchema(), store.getSortInfo()); + commit(sFunc, conf, storeConfig, + store.getSFile().getFuncSpec().toString()); + } + } +} Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=831051&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Thu Oct 29 18:00:26 2009 @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.builtin; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.zip.GZIPInputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.ExecType; +import org.apache.pig.FuncSpec; +import org.apache.pig.IndexableLoadFunc; +import org.apache.pig.LoadFunc; +import org.apache.pig.PigException; +import org.apache.pig.backend.datastorage.DataStorage; +import org.apache.pig.backend.datastorage.SeekableInputStream; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.BufferedPositionedInputStream; +import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.tools.bzip2r.CBZip2InputStream; + +/** + * + */ +public class DefaultIndexableLoader implements IndexableLoadFunc { + + + // FileSpec of index file which will be read from HDFS. + private String indexFile; + private String indexFileLoadFuncSpec; + + private LoadFunc loader; + // Index is modeled as FIFO queue and LinkedList implements java Queue interface. + private LinkedList<Tuple> index; + private FuncSpec rightLoaderFuncSpec; + private PigContext pc; + private String scope; + private Tuple dummyTuple = null; + private transient TupleFactory mTupleFactory; + private InputStream is; + private String currentFileName; + + public DefaultIndexableLoader(String loaderFuncSpec, String indexFile, String indexFileLoadFuncSpec, String scope) { + this.rightLoaderFuncSpec = new FuncSpec(loaderFuncSpec); + this.indexFile = indexFile; + this.indexFileLoadFuncSpec = indexFileLoadFuncSpec; + this.scope = scope; + } + + @SuppressWarnings("unchecked") + @Override + public void seekNear(Tuple keys) throws IOException{ + + // some setup + mTupleFactory = TupleFactory.getInstance(); + + /* Currently whole of index is read into memory. Typically, index is small. Usually + few KBs in size. So, this should not be an issue. + However, reading whole index at startup time is not required. So, this can be improved upon. + Assumption: Index being read is sorted on keys followed by filename, followed by offset. + */ + + // Index is modeled as FIFO Queue, that frees us from keeping track of which index entry should be read next. + + // the keys are sent in a tuple. If there is really only + // 1 join key, it would be the first field of the tuple. If + // there are multiple Join keys, the tuple itself represents + // the join key + Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys); + + POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec)), false); + try { + pc = (PigContext)ObjectSerializer.deserialize(PigMapReduce.sJobConf.get("pig.pigContext")); + } catch (IOException e) { + int errCode = 2094; + String msg = "Unable to deserialize pig context."; + throw new ExecException(msg,errCode,e); + } + pc.connect(); + ld.setPc(pc); + index = new LinkedList<Tuple>(); + for(Result res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple)) + index.offer((Tuple) res.result); + + Tuple prevIdxEntry = null; + Tuple matchedEntry; + + // When the first call is made, we need to seek into right input at correct offset. + while(true){ + // Keep looping till we find first entry in index >= left key + // then return the prev idx entry. + + Tuple curIdxEntry = index.poll(); + if(null == curIdxEntry){ + // Its possible that we hit end of index and still doesn't encounter + // idx entry >= left key, in that case return last index entry. + matchedEntry = prevIdxEntry; + break; + } + Object extractedKey = extractKeysFromIdxTuple(curIdxEntry); + if(extractedKey == null){ + prevIdxEntry = curIdxEntry; + continue; + } + + if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){ + if(null == prevIdxEntry) // very first entry in index. + matchedEntry = curIdxEntry; + else{ + matchedEntry = prevIdxEntry; + index.addFirst(curIdxEntry); // We need to add back the current index Entry because we are reading ahead. + } + break; + } + else + prevIdxEntry = curIdxEntry; + } + + if(matchedEntry == null){ + + int errCode = 2165; + String errMsg = "Problem in index construction."; + throw new ExecException(errMsg,errCode,PigException.BUG); + } + + Object extractedKey = extractKeysFromIdxTuple(matchedEntry); + + if(extractedKey != null){ + Class idxKeyClass = extractedKey.getClass(); + if( ! firstLeftKey.getClass().equals(idxKeyClass)){ + + // This check should indeed be done on compile time. But to be on safe side, we do it on runtime also. + int errCode = 2166; + String errMsg = "Key type mismatch. Found key of type "+firstLeftKey.getClass().getCanonicalName()+" on left side. But, found key of type "+ idxKeyClass.getCanonicalName()+" in index built for right side."; + throw new ExecException(errMsg,errCode,PigException.BUG); + } + } + initRightLoader(matchedEntry); + } + + private void initRightLoader(Tuple idxEntry) throws IOException{ + + // bind loader to file pointed by this index Entry. + int keysCnt = idxEntry.size(); + Long offset = (Long)idxEntry.get(keysCnt-1); + if(offset > 0) + // Loader will throw away one tuple if we are in the middle of the block. We don't want that. + offset -= 1 ; + FileSpec lFile = new FileSpec((String)idxEntry.get(keysCnt-2),this.rightLoaderFuncSpec); + currentFileName = lFile.getFileName(); + loader = (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec()); + is = FileLocalizer.open(currentFileName, offset, pc); + if (currentFileName.endsWith(".bz") || currentFileName.endsWith(".bz2")) { + is = new CBZip2InputStream((SeekableInputStream)is, 9); + } else if (currentFileName.endsWith(".gz")) { + is = new GZIPInputStream(is); + } + + + loader.bindTo(currentFileName , new BufferedPositionedInputStream(is), offset, Long.MAX_VALUE); + } + + private Object extractKeysFromIdxTuple(Tuple idxTuple) throws ExecException{ + + int idxTupSize = idxTuple.size(); + + if(idxTupSize == 3) + return idxTuple.get(0); + + List<Object> list = new ArrayList<Object>(idxTupSize-2); + for(int i=0; i<idxTupSize-2;i++) + list.add(idxTuple.get(i)); + + return mTupleFactory.newTupleNoCopy(list); + } + + private OperatorKey genKey(){ + return new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope)); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bindTo(java.lang.String, org.apache.pig.impl.io.BufferedPositionedInputStream, long, long) + */ + @Override + public void bindTo(String fileName, BufferedPositionedInputStream is, + long offset, long end) throws IOException { + + + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToBag(byte[]) + */ + @Override + public DataBag bytesToBag(byte[] b) throws IOException { + throw new IOException("Method not implemented by design"); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToCharArray(byte[]) + */ + @Override + public String bytesToCharArray(byte[] b) throws IOException { + throw new IOException("Method not implemented by design"); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToDouble(byte[]) + */ + @Override + public Double bytesToDouble(byte[] b) throws IOException { + throw new IOException("Method not implemented by design"); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToFloat(byte[]) + */ + @Override + public Float bytesToFloat(byte[] b) throws IOException { + throw new IOException("Method not implemented by design"); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToInteger(byte[]) + */ + @Override + public Integer bytesToInteger(byte[] b) throws IOException { + throw new IOException("Method not implemented by design"); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToLong(byte[]) + */ + @Override + public Long bytesToLong(byte[] b) throws IOException { + throw new IOException("Method not implemented by design"); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToMap(byte[]) + */ + @Override + public Map<String, Object> bytesToMap(byte[] b) throws IOException { + throw new IOException("Method not implemented by design"); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToTuple(byte[]) + */ + @Override + public Tuple bytesToTuple(byte[] b) throws IOException { + throw new IOException("Method not implemented by design"); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage) + */ + @Override + public Schema determineSchema(String fileName, ExecType execType, + DataStorage storage) throws IOException { + throw new IOException("Method not implemented by design"); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema) + */ + @Override + public void fieldsToRead(Schema schema) { + + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#getNext() + */ + @Override + public Tuple getNext() throws IOException { + Tuple t = loader.getNext(); + if(t == null) { + while(true){ // But next file may be same as previous one, because index may contain multiple entries for same file. + Tuple idxEntry = index.poll(); + if(null == idxEntry) { // Index is finished too. Right stream is finished. No more tuples. + return null; + } else { + if(currentFileName.equals((String)idxEntry.get(idxEntry.size()-2))) { + continue; + } else { + initRightLoader(idxEntry); // bind loader to file and get tuple from it. + return loader.getNext(); + } + } + } + } + return t; + } + + @Override + public void close() throws IOException { + is.close(); + } + + /* (non-Javadoc) + * @see org.apache.pig.IndexableLoadFunc#initialize(org.apache.hadoop.conf.Configuration) + */ + @Override + public void initialize(Configuration conf) throws IOException { + // nothing to do + + } + +} Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=831051&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Oct 29 18:00:26 2009 @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.util; + +/** + * Class with utility static methods + */ +public class Utils { + + /** + * This method is a helper for classes to implement {...@link java.lang.Object#equals(java.lang.Object)} + * checks if two objects are equals - two levels of checks are + * made - first if both are null or not null. If either is null, + * check is made whether both are null. + * If both are non null, equality also is checked if so indicated + * @param obj1 first object to be compared + * @param obj2 second object to be compared + * @param checkEquality flag to indicate whether object equality should + * be checked if obj1 and obj2 are non-null + * @return true if the two objects are equal + * false otherwise + */ + public static boolean checkNullEquals(Object obj1, Object obj2, boolean checkEquality) { + if(obj1 == null || obj2 == null) { + return obj1 == obj2; + } + if(checkEquality) { + if(!obj1.equals(obj2)) { + return false; + } + } + return true; + } + + + /** + * This method is a helper for classes to implement {...@link java.lang.Object#equals(java.lang.Object)} + * The method checks whether the two arguments are both null or both not null and + * whether they are of the same class + * @param obj1 first object to compare + * @param obj2 second object to compare + * @return true if both objects are null or both are not null + * and if both are of the same class if not null + * false otherwise + */ + public static boolean checkNullAndClass(Object obj1, Object obj2) { + if(checkNullEquals(obj1, obj2, false)) { + if(obj1 != null) { + return obj1.getClass() == obj2.getClass(); + } else { + return true; // both obj1 and obj2 should be null + } + } else { + return false; + } + } + + +}