Author: pradeepkth Date: Mon Nov 23 20:27:30 2009 New Revision: 883486 URL: http://svn.apache.org/viewvc?rev=883486&view=rev Log: PIG-1088:change merge join and merge join indexer to work with new LoadFunc interface (thejas via pradeepkth)
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java Removed: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=883486&r1=883485&r2=883486&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original) +++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Mon Nov 23 20:27:30 2009 @@ -24,6 +24,9 @@ INCOMPATIBLE CHANGES +PIG-1088: change merge join and merge join indexer to work with new LoadFunc +interface (thejas via pradeepkth) + PIG-879: Pig should provide a way for input location string in load statement to be passed as-is to the Loader (rding via pradeepkth) Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java?rev=883486&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java (added) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java Mon Nov 23 20:27:30 2009 @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig; + +import java.io.IOException; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; + +/** + * This class provides an implementation of OrderedLoadFunc interface + * which can be optionally re-used by LoadFuncs that use FileInputFormat, by + * having this as a super class + */ +public abstract class FileInputLoadFunc extends OrderedLoadFunc { + + @Override + public WritableComparable<?> getSplitComparable(PigSplit split) + throws IOException{ + FileSplit fileSplit = null; + if(split.getWrappedSplit() instanceof FileSplit){ + fileSplit = (FileSplit)split.getWrappedSplit(); + }else{ + throw new RuntimeException("LoadFunc expected split of type FileSplit"); + } + + return new FileSplitComparable( + fileSplit.getPath().toString(), + fileSplit.getStart() + ); + } + +} + + Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java?rev=883486&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java (added) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java Mon Nov 23 20:27:30 2009 @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.pig.data.DataReaderWriter; + + +/** + * This class can be used to represent a relative position in a file. + * compareTo(other) function of WritaleComparable interface is used to compare + * position of different objects of this class. + */ +public class FileSplitComparable implements WritableComparable<FileSplitComparable>, Serializable{ + + private static final long serialVersionUID = 1L; + protected String filename; + protected Long offset; + + //need a default constructor to be able to de-serialize using just + // the Writable interface + public FileSplitComparable(){} + + public FileSplitComparable(String fileName, long offset){ + this.filename = fileName; + this.offset = offset; + } + + + @Override + public int compareTo(FileSplitComparable other) { + int rc = filename.compareTo(other.filename); + if (rc == 0) + rc = Long.signum(offset - other.offset); + return rc; + } + + + @Override + public void readFields(DataInput in) throws IOException { + filename = (String) DataReaderWriter.readDatum(in); + offset = (Long)DataReaderWriter.readDatum(in); + } + + + @Override + public void write(DataOutput out) throws IOException { + DataReaderWriter.writeDatum(out, filename); + DataReaderWriter.writeDatum(out, offset); + } + + @Override + public String toString(){ + return "FileName: '" + filename + "' Offset: " + offset; + } + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((filename == null) ? 0 : filename.hashCode()); + result = prime * result + ((offset == null) ? 0 : offset.hashCode()); + return result; + } + + /* (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + FileSplitComparable other = (FileSplitComparable) obj; + if (filename == null) { + if (other.filename != null) + return false; + } else if (!filename.equals(other.filename)) + return false; + if (offset == null) { + if (other.offset != null) + return false; + } else if (!offset.equals(other.offset)) + return false; + return true; + } +} \ No newline at end of file Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java?rev=883486&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java (added) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java Mon Nov 23 20:27:30 2009 @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig; + +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; + +/** + * Implementing this interface indicates to Pig that a given loader + * can be used for MergeJoin. The position as represented by the + * WritableComparable object is stored in the index created by + * MergeJoin sampling MR job to get an ordered sequence of splits. + * This is necessary when the sort key spans multiple splits. + */ +public abstract class OrderedLoadFunc extends LoadFunc { + + /** + * The WritableComparable object returned will be used to compare + * the position of different splits in an ordered stream + * @param split + * @return WritableComparable representing the position of the split in input + * @throws IOException + */ + public abstract WritableComparable<?> getSplitComparable(PigSplit split) + throws IOException; + +} Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java?rev=883486&r1=883485&r2=883486&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java Mon Nov 23 20:27:30 2009 @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.pig; - -import java.io.IOException; - -import org.apache.pig.data.Tuple; -import org.apache.pig.LoadFunc; - -/** - * Implementing this interface indicates to Pig that a given loader can be - * used by a sampling loader. The requirement for this is that the loader - * can handle a getNext() call without knowing the position in the file. - * This will not be the case for loaders that handle structured data such - * as XML where they must start at the beginning of the file in order to - * understand their position. Record oriented loaders such as PigStorage - * can handle this by seeking to the next record delimiter and starting - * from that point. Another requirement is that the loader be able to - * skip or seek in its input stream. - */ -public abstract class SamplableLoader extends LoadFunc { - - /** - * Skip ahead in the input stream. - * @param n number of bytes to skip - * @return number of bytes actually skipped. The return semantics are - * exactly the same as {...@link java.io.InpuStream#skip(long)} - */ - public abstract long skip(long n) throws IOException; - - /** - * Get the current position in the stream. - * @return position in the stream. - */ - public abstract long getPosition() throws IOException; - - /** - * Get the next tuple from the stream starting from the current - * read position. - * The loader implementation should not assume that current read position - * in the stream is at the beginning of a record since this method is called - * for sampling and the current read position in the stream could be anywhere - * in the stream. - * @return the next tuple from underlying input stream or null if there are no more tuples - * to be processed. - */ - public abstract Tuple getSampledTuple() throws IOException; -} Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=883486&r1=883485&r2=883486&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Nov 23 20:27:30 2009 @@ -34,6 +34,7 @@ import org.apache.pig.FuncSpec; import org.apache.pig.IndexableLoadFunc; import org.apache.pig.LoadFunc; +import org.apache.pig.OrderedLoadFunc; import org.apache.pig.PigException; import org.apache.pig.PigWarning; import org.apache.pig.builtin.BinStorage; @@ -42,7 +43,6 @@ import org.apache.pig.impl.builtin.DefaultIndexableLoader; import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.builtin.PoissonSampleLoader; -import org.apache.pig.impl.builtin.MergeJoinIndexer; import org.apache.pig.impl.builtin.GetMemNumRows; import org.apache.pig.impl.builtin.PartitionSkewedKeys; import org.apache.pig.impl.builtin.RandomSampleLoader; @@ -56,7 +56,6 @@ import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; @@ -1183,17 +1182,25 @@ String[] indexerArgs = new String[3]; FileSpec origRightLoaderFileSpec = rightLoader.getLFile(); indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString(); - + if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0]) instanceof OrderedLoadFunc)){ + int errCode = 1104; + String errMsg = "Right input of merge-join must implement " + + "OrderedLoadFunc interface. The specified loader " + + indexerArgs[0] + " doesn't implement it"; + throw new MRCompilerException(errMsg,errCode); + } List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1); indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans); indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan); FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)); rightLoader.setLFile(lFile); - // Loader of mro will return a tuple of form (key1, key2, ..,filename, offset) + // Loader of mro will return a tuple of form - + // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer // Now set up a POLocalRearrange which has "all" as the key and tuple fetched // by loader as the "value" of POLocalRearrange - // Sorting of index can possibly be achieved by using Hadoop sorting between map and reduce instead of Pig doing sort. If that is so, + // Sorting of index can possibly be achieved by using Hadoop sorting + // between map and reduce instead of Pig doing sort. If that is so, // it will simplify lot of the code below. PhysicalPlan lrPP = new PhysicalPlan(); @@ -1259,11 +1266,12 @@ rightMROpr.setReduceDone(true); // set up the DefaultIndexableLoader for the join operator - String[] defaultIndexableLoaderArgs = new String[4]; + String[] defaultIndexableLoaderArgs = new String[5]; defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString(); defaultIndexableLoaderArgs[1] = strFile.getFileName(); defaultIndexableLoaderArgs[2] = strFile.getFuncSpec().toString(); defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope; + defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName(); joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs))); joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName()); Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java?rev=883486&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java (added) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java Mon Nov 23 20:27:30 2009 @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.pig.LoadCaster; +import org.apache.pig.LoadFunc; +import org.apache.pig.OrderedLoadFunc; +import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.util.ObjectSerializer; + +/** Merge Join indexer is used to generate on the fly index for doing Merge Join efficiently. + * It samples first record from every block of right side input. + * and returns tuple in the following format : + * (key0, key1,...,position,splitIndex) + * These tuples are then sorted before being written out to index file on HDFS. + */ +public class MergeJoinIndexer extends LoadFunc{ + + private boolean firstRec = true; + private transient TupleFactory mTupleFactory; + private POLocalRearrange lr; + private PhysicalPlan precedingPhyPlan; + private int keysCnt; + private PhysicalOperator rightPipelineLeaf; + private PhysicalOperator rightPipelineRoot; + private Tuple dummyTuple = null; + private OrderedLoadFunc loader; + private PigSplit pigSplit = null; + + /** @param funcSpec : Loader specification. + * @param innerPlan : This is serialized version of LR plan. We + * want to keep only keys in our index file and not the whole tuple. So, we need LR and thus its plan + * to get keys out of the sampled tuple. + * @param serializedPhyPlan Serialized physical plan on right side. + * @throws ExecException + */ + @SuppressWarnings("unchecked") + public MergeJoinIndexer(String funcSpec, String innerPlan, String serializedPhyPlan) throws ExecException{ + + loader = (OrderedLoadFunc)PigContext.instantiateFuncFromSpec(funcSpec); + try { + List<PhysicalPlan> innerPlans = (List<PhysicalPlan>)ObjectSerializer.deserialize(innerPlan); + lr = new POLocalRearrange(new OperatorKey("MergeJoin Indexer",NodeIdGenerator.getGenerator().getNextNodeId("MergeJoin Indexer"))); + lr.setPlans(innerPlans); + keysCnt = innerPlans.size(); + precedingPhyPlan = (PhysicalPlan)ObjectSerializer.deserialize(serializedPhyPlan); + if(precedingPhyPlan != null){ + if(precedingPhyPlan.getLeaves().size() != 1 || precedingPhyPlan.getRoots().size() != 1){ + int errCode = 2168; + String errMsg = "Expected physical plan with exactly one root and one leaf."; + throw new ExecException(errMsg,errCode,PigException.BUG); + } + this.rightPipelineLeaf = precedingPhyPlan.getLeaves().get(0); + this.rightPipelineRoot = precedingPhyPlan.getRoots().get(0); + this.rightPipelineRoot.setInputs(null); + } + } + catch (IOException e) { + int errCode = 2094; + String msg = "Unable to deserialize plans in Indexer."; + throw new ExecException(msg,errCode,e); + } + mTupleFactory = TupleFactory.getInstance(); + } + + @Override + public Tuple getNext() throws IOException { + + if(!firstRec) // We sample only one record per block. + return null; + WritableComparable<?> position = loader.getSplitComparable(pigSplit); + Object key = null; + Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2); + + while(true){ + Tuple readTuple = loader.getNext(); + + if(null == readTuple){ // We hit the end. + + for(int i =0; i < keysCnt; i++) + wrapperTuple.set(i, null); + wrapperTuple.set(keysCnt, position); + firstRec = false; + return wrapperTuple; + } + + if (null == precedingPhyPlan){ + + lr.attachInput(readTuple); + key = ((Tuple)lr.getNext(dummyTuple).result).get(1); + lr.detachInput(); + if ( null == key) // Tuple with null key. Drop it. + continue; + break; + } + + // There is a physical plan. + + rightPipelineRoot.attachInput(readTuple); + boolean fetchNewTup; + + while(true){ + + Result res = rightPipelineLeaf.getNext(dummyTuple); + switch(res.returnStatus){ + + case POStatus.STATUS_OK: + + lr.attachInput((Tuple)res.result); + key = ((Tuple)lr.getNext(dummyTuple).result).get(1); + lr.detachInput(); + if ( null == key) // Tuple with null key. Drop it. + continue; + fetchNewTup = false; + break; + + case POStatus.STATUS_EOP: + fetchNewTup = true; + break; + + default: + int errCode = 2164; + String errMsg = "Expected EOP/OK as return status. Found: "+res.returnStatus; + throw new ExecException(errMsg,errCode); + } + break; + } + if (!fetchNewTup) + break; + } + + if(key instanceof Tuple){ + Tuple tupKey = (Tuple)key; + for(int i =0; i < tupKey.size(); i++) + wrapperTuple.set(i, tupKey.get(i)); + } + + else + wrapperTuple.set(0, key); + + wrapperTuple.set(keysCnt, position); + wrapperTuple.set(keysCnt+1, pigSplit.getSplitIndex()); + firstRec = false; + return wrapperTuple; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#getInputFormat() + */ + @Override + public InputFormat getInputFormat() throws IOException { + return loader.getInputFormat(); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#getLoadCaster() + */ + @Override + public LoadCaster getLoadCaster() throws IOException { + return loader.getLoadCaster(); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.hadoop.mapreduce.InputSplit) + */ + @Override + public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { + loader.prepareToRead(reader, split); + pigSplit = split; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job) + */ + @Override + public void setLocation(String location, Job job) throws IOException { + loader.setLocation(location, job); + } +} Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=883486&r1=883485&r2=883486&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Mon Nov 23 20:27:30 2009 @@ -58,8 +58,6 @@ transient LoadFunc loader = null; // The filespec on which the operator is based FileSpec lFile; - // The stream used to bind to by the loader - transient InputStream is; // PigContext passed to us by the operator creator PigContext pc; //Indicates whether the loader setup is done or not @@ -107,13 +105,7 @@ ConfigurationUtil.toConfiguration(pc.getProperties()), filename, 0); - - // XXX : FIXME need to get this to work with new loadfunc interface - the - // below code is for merge join - hopefully we will no longer need it - // and then we can just get rid of it and the rest should be fine. - is = (this.offset == 0) ? FileLocalizer.open(filename, pc) : FileLocalizer.open(filename, this.offset,pc); - -// loader.bindTo(filename , new BufferedPositionedInputStream(is), this.offset, Long.MAX_VALUE); + } @@ -123,7 +115,6 @@ * @throws IOException */ public void tearDown() throws IOException{ - is.close(); setUpDone = false; } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=883486&r1=883485&r2=883486&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java Mon Nov 23 20:27:30 2009 @@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.pig.FileInputLoadFunc; import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; import org.apache.pig.PigException; @@ -54,8 +55,9 @@ import org.apache.pig.impl.io.BinStorageRecordWriter; import org.apache.pig.impl.util.LogUtils; -public class BinStorage extends LoadFunc - implements ReversibleLoadStoreFunc, LoadCaster, StoreFunc { +public class BinStorage extends FileInputLoadFunc +implements ReversibleLoadStoreFunc, LoadCaster, StoreFunc { + public static final int RECORD_1 = 0x01; public static final int RECORD_2 = 0x02; Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=883486&r1=883485&r2=883486&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java Mon Nov 23 20:27:30 2009 @@ -37,6 +37,8 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.pig.FileInputLoadFunc; +import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; @@ -55,8 +57,9 @@ * delimiter is given as a regular expression. See String.split(delimiter) and * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for more information. */ -public class PigStorage extends LoadFunc - implements ReversibleLoadStoreFunc, StoreFunc { +public class PigStorage +extends FileInputLoadFunc +implements ReversibleLoadStoreFunc, StoreFunc { protected RecordReader in = null; protected RecordWriter writer = null; protected final Log mLog = LogFactory.getLog(getClass()); Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java?rev=883486&r1=883485&r2=883486&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java Mon Nov 23 20:27:30 2009 @@ -20,12 +20,14 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Writable; import org.apache.pig.backend.executionengine.ExecException; /** @@ -98,6 +100,30 @@ return new String(ba, DataReaderWriter.UTF8); } + public static Writable bytesToWritable(DataInput in) throws IOException { + String className = (String) readDatum(in); + // create the writeable class . It needs to have a default constructor + Class<?> objClass = null ; + try { + objClass = Class.forName(className); + } catch (ClassNotFoundException e) { + throw new IOException("Could not find class " + className + + ", while attempting to de-serialize it ", e); + } + Writable writable = null; + try { + writable = (Writable) objClass.newInstance(); + } catch (Exception e) { + String msg = "Could create instance of class " + className + + ", while attempting to de-serialize it. (no default constructor ?)"; + throw new IOException(msg, e); + } + + //read the fields of the object from DataInput + writable.readFields(in); + return writable; + } + public static Object readDatum(DataInput in) throws IOException, ExecException { // Read the data type byte b = in.readByte(); @@ -149,7 +175,10 @@ case DataType.CHARARRAY: return bytesToCharArray(in); - + + case DataType.GENERIC_WRITABLECOMPARABLE : + return bytesToWritable(in); + case DataType.NULL: return null; @@ -261,6 +290,13 @@ } break; } + case DataType.GENERIC_WRITABLECOMPARABLE : + out.writeByte(DataType.GENERIC_WRITABLECOMPARABLE); + //store the class name, so we know the class to create on read + writeDatum(out, val.getClass().getName()); + Writable writable = (Writable)val; + writable.write(out); + break; case DataType.NULL: out.writeByte(DataType.NULL); Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java?rev=883486&r1=883485&r2=883486&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java Mon Nov 23 20:27:30 2009 @@ -24,8 +24,8 @@ import java.util.Iterator; import java.util.Map; import java.util.TreeMap; -import java.util.Set; +import org.apache.hadoop.io.WritableComparable; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -62,6 +62,12 @@ public static final byte MAP = 100; public static final byte TUPLE = 110; public static final byte BAG = 120; + + // internal use only; used to store WriteableComparable objects + // for creating ordered index in MergeJoin. Expecting a object that + // implements Writable interface and has default constructor + public static final byte GENERIC_WRITABLECOMPARABLE = 123; + public static final byte INTERNALMAP = 127; // internal use only; for maps that are object->object. Used by FindQuantiles. public static final byte ERROR = -1; @@ -86,6 +92,7 @@ else if (o instanceof Double) return DOUBLE; else if (o instanceof Boolean) return BOOLEAN; else if (o instanceof Byte) return BYTE; + else if (o instanceof WritableComparable) return GENERIC_WRITABLECOMPARABLE; else {return ERROR;} } @@ -135,6 +142,7 @@ } else { interfaces = ioeInterfaces; } + boolean matchedWritableComparable = false; for (int i = 0; i < interfaces.length; i++) { if (interfaces[i].getName().equals("org.apache.pig.data.Tuple")) { return TUPLE; @@ -142,8 +150,13 @@ return BAG; } else if (interfaces[i].getName().equals("java.util.Map")) { return MAP; - } + } else if (interfaces[i].getName().equals("org.apache.hadoop.io.WritableComparable")) { + // use GENERIC_WRITABLECOMPARABLE type only as last resort + matchedWritableComparable = true; + } } + if(matchedWritableComparable) + return GENERIC_WRITABLECOMPARABLE; return ERROR; } @@ -154,14 +167,19 @@ } public static byte[] genAllTypes(){ byte[] types = { DataType.BAG, DataType.BIGCHARARRAY, DataType.BOOLEAN, DataType.BYTE, DataType.BYTEARRAY, - DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.INTERNALMAP, + DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT, + DataType.GENERIC_WRITABLECOMPARABLE, + DataType.INTEGER, DataType.INTERNALMAP, DataType.LONG, DataType.MAP, DataType.TUPLE}; return types; } private static String[] genAllTypeNames(){ - String[] names = { "BAG", "BIGCHARARRAY", "BOOLEAN", "BYTE", "BYTEARRAY", "CHARARRAY", "DOUBLE", "FLOAT", "INTEGER", - "INTERNALMAP", "LONG", "MAP", "TUPLE" }; + String[] names = { "BAG", "BIGCHARARRAY", "BOOLEAN", "BYTE", "BYTEARRAY", + "CHARARRAY", "DOUBLE", "FLOAT", + "GENERIC_WRITABLECOMPARABLE", + "INTEGER","INTERNALMAP", + "LONG", "MAP", "TUPLE" }; return names; } @@ -215,6 +233,7 @@ case INTERNALMAP: return "internalmap"; case TUPLE: return "tuple"; case BAG: return "bag"; + case GENERIC_WRITABLECOMPARABLE: return "generic_writablecomparable"; default: return "Unknown"; } } @@ -253,7 +272,8 @@ (dataType == FLOAT) || (dataType == DOUBLE) || (dataType == BOOLEAN) || - (dataType == BYTE)); + (dataType == BYTE) || + (dataType == GENERIC_WRITABLECOMPARABLE)); } /** @@ -366,6 +386,9 @@ } } + case GENERIC_WRITABLECOMPARABLE: + return ((Comparable)o1).compareTo(o2); + case INTERNALMAP: return -1; // Don't think anyway will want to do this. @@ -374,6 +397,7 @@ case BAG: return ((DataBag)o1).compareTo((DataBag)o2); + default: throw new RuntimeException("Unkown type " + dt1 + Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=883486&r1=883485&r2=883486&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Mon Nov 23 20:27:30 2009 @@ -18,14 +18,10 @@ package org.apache.pig.impl.builtin; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.zip.GZIPInputStream; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; @@ -34,8 +30,8 @@ import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; import org.apache.pig.PigException; -import org.apache.pig.backend.datastorage.SeekableInputStream; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -44,17 +40,19 @@ import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.io.ReadToEndLoader; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.util.ObjectSerializer; -import org.apache.tools.bzip2r.CBZip2InputStream; /** - * + * Used by MergeJoin . Takes an index on sorted data + * consisting of sorted tuples of the form + * (key1,key2..., position,splitIndex) as input. For key given in seekNear(Tuple) + * finds the splitIndex that can contain the key and initializes ReadToEndLoader + * to read from that splitIndex onwards , in the sequence of splits in the index */ -//XXX FIXME - make this work with new load-store redesign public class DefaultIndexableLoader extends IndexableLoadFunc { @@ -70,20 +68,26 @@ private String scope; private Tuple dummyTuple = null; private transient TupleFactory mTupleFactory; - private InputStream is; - private String currentFileName; + + private String inpLocation; - public DefaultIndexableLoader(String loaderFuncSpec, String indexFile, String indexFileLoadFuncSpec, String scope) { + public DefaultIndexableLoader( + String loaderFuncSpec, + String indexFile, + String indexFileLoadFuncSpec, + String scope, + String inputLocation + ) { this.rightLoaderFuncSpec = new FuncSpec(loaderFuncSpec); this.indexFile = indexFile; this.indexFileLoadFuncSpec = indexFileLoadFuncSpec; this.scope = scope; + this.inpLocation = inputLocation; } @SuppressWarnings("unchecked") @Override public void seekNear(Tuple keys) throws IOException{ - // some setup mTupleFactory = TupleFactory.getInstance(); @@ -100,7 +104,6 @@ // there are multiple Join keys, the tuple itself represents // the join key Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys); - POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec)), false); try { pc = (PigContext)ObjectSerializer.deserialize(PigMapReduce.sJobConf.get("pig.pigContext")); @@ -115,6 +118,7 @@ for(Result res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple)) index.offer((Tuple) res.result); + Tuple prevIdxEntry = null; Tuple matchedEntry; @@ -137,11 +141,14 @@ } if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){ + index.addFirst(curIdxEntry); // We need to add back the current index Entry because we are reading ahead. if(null == prevIdxEntry) // very first entry in index. matchedEntry = curIdxEntry; else{ - matchedEntry = prevIdxEntry; - index.addFirst(curIdxEntry); // We need to add back the current index Entry because we are reading ahead. + matchedEntry = prevIdxEntry; + // start join from previous idx entry, it might have tuples + // with this key + index.addFirst(prevIdxEntry); } break; } @@ -168,29 +175,24 @@ throw new ExecException(errMsg,errCode,PigException.BUG); } } - initRightLoader(matchedEntry); - } - - private void initRightLoader(Tuple idxEntry) throws IOException{ - - // bind loader to file pointed by this index Entry. - int keysCnt = idxEntry.size(); - Long offset = (Long)idxEntry.get(keysCnt-1); - if(offset > 0) - // Loader will throw away one tuple if we are in the middle of the block. We don't want that. - offset -= 1 ; - FileSpec lFile = new FileSpec((String)idxEntry.get(keysCnt-2),this.rightLoaderFuncSpec); - currentFileName = lFile.getFileName(); - loader = (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec()); - is = FileLocalizer.open(currentFileName, offset, pc); - if (currentFileName.endsWith(".bz") || currentFileName.endsWith(".bz2")) { - is = new CBZip2InputStream((SeekableInputStream)is, 9); - } else if (currentFileName.endsWith(".gz")) { - is = new GZIPInputStream(is); - } - - -// loader.bindTo(currentFileName , new BufferedPositionedInputStream(is), offset, Long.MAX_VALUE); + //add remaining split indexes to splitsAhead array + int [] splitsAhead = new int[index.size()]; + int splitsAheadIdx = 0; + for(Tuple t : index){ + splitsAhead[splitsAheadIdx++] = (Integer) t.get( t.size()-1 ); + } + + initRightLoader(splitsAhead); + } + + private void initRightLoader(int [] splitsToBeRead) throws IOException{ + //create ReadToEndLoader that will read the given splits in order + loader = new ReadToEndLoader( + (LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec), + ConfigurationUtil.toConfiguration(pc.getProperties()), + inpLocation, + splitsToBeRead + ); } private Object extractKeysFromIdxTuple(Tuple idxTuple) throws ExecException{ @@ -200,8 +202,9 @@ if(idxTupSize == 3) return idxTuple.get(0); - List<Object> list = new ArrayList<Object>(idxTupSize-2); - for(int i=0; i<idxTupSize-2;i++) + int numColsInKey = (idxTupSize - 2); + List<Object> list = new ArrayList<Object>(numColsInKey); + for(int i=0; i < numColsInKey; i++) list.add(idxTuple.get(i)); return mTupleFactory.newTupleNoCopy(list); @@ -214,27 +217,11 @@ @Override public Tuple getNext() throws IOException { Tuple t = loader.getNext(); - if(t == null) { - while(true){ // But next file may be same as previous one, because index may contain multiple entries for same file. - Tuple idxEntry = index.poll(); - if(null == idxEntry) { // Index is finished too. Right stream is finished. No more tuples. - return null; - } else { - if(currentFileName.equals((String)idxEntry.get(idxEntry.size()-2))) { - continue; - } else { - initRightLoader(idxEntry); // bind loader to file and get tuple from it. - return loader.getNext(); - } - } - } - } return t; } @Override public void close() throws IOException { - is.close(); } @Override @@ -244,16 +231,23 @@ } @Override - public InputFormat getInputFormat() { - return null; + public InputFormat getInputFormat() throws IOException { + throw new UnsupportedOperationException(); } @Override + public LoadCaster getLoadCaster() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override public void prepareToRead(RecordReader reader, PigSplit split) { + throw new UnsupportedOperationException(); } @Override - public void setLocation(String location, Job job) throws IOException { + public void setLocation(String location, Job job) throws IOException { + throw new UnsupportedOperationException(); } } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=883486&r1=883485&r2=883486&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java Mon Nov 23 20:27:30 2009 @@ -22,7 +22,6 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -67,13 +66,18 @@ * the input location string (typically input file/dir name ) */ private String inputLocation; + + /** + * If the splits to be read are not in increasing sequence of integers + * this array can be used + */ + private int[] toReadSplits = null; /** - * the index of the split (in {...@link InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)}) - * to start reading from + * index into toReadSplits */ - private int startSplitIndex; - + private int toReadSplitsIdx = 0; + /** * the index of the split the loader is currently reading from */ @@ -82,60 +86,86 @@ /** * the input splits returned by underlying {...@link InputFormat#getSplits(JobContext)} */ - private List<InputSplit> splits; + private List<InputSplit> inpSplits = null; /** * underlying RecordReader */ - private RecordReader reader; + private RecordReader reader = null; /** * underlying InputFormat */ - private InputFormat inputFormat; + private InputFormat inputFormat = null; /** * @param wrappedLoadFunc * @param conf * @param inputLocation - * @param startSplitIndex + * @param splitIndex * @throws IOException * @throws InterruptedException */ public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf, - String inputLocation, int startSplitIndex) throws IOException { + String inputLocation, int splitIndex) throws IOException { this.wrappedLoadFunc = wrappedLoadFunc; + this.inputLocation = inputLocation; + this.conf = conf; + this.curSplitIndex = splitIndex; + init(); + } + + /** + * This constructor takes an array of split indexes (toReadSplitIdxs) of the + * splits to be read. + * @param wrappedLoadFunc + * @param conf + * @param inputLocation + * @param toReadSplitIdxs + * @throws IOException + * @throws InterruptedException + */ + public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf, + String inputLocation, int[] toReadSplitIdxs) throws IOException { + this.wrappedLoadFunc = wrappedLoadFunc; + this.inputLocation = inputLocation; + this.toReadSplits = toReadSplitIdxs; + this.conf = conf; + this.curSplitIndex = + toReadSplitIdxs.length > 0 ? toReadSplitIdxs[0] : Integer.MAX_VALUE; + init(); + } + + private void init() throws IOException { // make a copy so that if the underlying InputFormat writes to the // conf, we don't affect the caller's copy - this.conf = new Configuration(conf); - this.inputLocation = inputLocation; - this.startSplitIndex = startSplitIndex; - this.curSplitIndex = startSplitIndex; - + conf = new Configuration(conf); // let's initialize the wrappedLoadFunc - Job job = new Job(this.conf); - wrappedLoadFunc.setLocation(this.inputLocation, + Job job = new Job(conf); + wrappedLoadFunc.setLocation(inputLocation, job); // The above setLocation call could write to the conf within // the job - get a hold of the modified conf - this.conf = job.getConfiguration(); + conf = job.getConfiguration(); inputFormat = wrappedLoadFunc.getInputFormat(); try { - splits = inputFormat.getSplits(new JobContext(this.conf, + inpSplits = inputFormat.getSplits(new JobContext(conf, new JobID())); } catch (InterruptedException e) { throw new IOException(e); - } + } } - + private boolean initializeReader() throws IOException, InterruptedException { - if(curSplitIndex > splits.size() - 1) { + if(curSplitIndex > inpSplits.size() - 1) { // past the last split, we are done return false; } - - InputSplit curSplit = splits.get(curSplitIndex); + if(reader != null){ + reader.close(); + } + InputSplit curSplit = inpSplits.get(curSplitIndex); TaskAttemptContext tAContext = new TaskAttemptContext(conf, new TaskAttemptID()); reader = inputFormat.createRecordReader(curSplit, tAContext); @@ -164,8 +194,7 @@ } // if loadfunc returned null, we need to read next split // if there is one - reader.close(); - curSplitIndex++; + updateCurSplitIndex(); return getNextHelper(); } } catch (InterruptedException e) { @@ -179,22 +208,41 @@ t = wrappedLoadFunc.getNext(); if(t == null) { // try next split - curSplitIndex++; + updateCurSplitIndex(); } else { return t; } } return null; } + + + /** + * Updates curSplitIndex , just increment if splitIndexes is null, + * else get next split in splitIndexes + */ + private void updateCurSplitIndex() { + if(toReadSplits == null){ + ++curSplitIndex; + }else{ + ++toReadSplitsIdx; + if(toReadSplitsIdx >= toReadSplits.length){ + // finished all the splits in splitIndexes array + curSplitIndex = Integer.MAX_VALUE; + }else{ + curSplitIndex = toReadSplits[toReadSplitsIdx]; + } + } + } @Override - public InputFormat getInputFormat() { - return null; + public InputFormat getInputFormat() throws IOException { + throw new UnsupportedOperationException(); } @Override - public LoadCaster getLoadCaster() { - return null; + public LoadCaster getLoadCaster() throws IOException { + throw new UnsupportedOperationException(); } @Override @@ -206,5 +254,7 @@ public void setLocation(String location, Job job) throws IOException { throw new UnsupportedOperationException(); } + + }