Author: pradeepkth
Date: Mon Nov 23 20:27:30 2009
New Revision: 883486
URL: http://svn.apache.org/viewvc?rev=883486&view=rev
Log:
PIG-1088:change merge join and merge join indexer to work with new LoadFunc
interface (thejas via pradeepkth)
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
Removed:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java
Modified:
hadoop/pig/branches/load-store-redesign/CHANGES.txt
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java
Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Mon Nov 23 20:27:30 2009
@@ -24,6 +24,9 @@
INCOMPATIBLE CHANGES
+PIG-1088: change merge join and merge join indexer to work with new LoadFunc
+interface (thejas via pradeepkth)
+
PIG-879: Pig should provide a way for input location string in load statement
to be passed as-is to the Loader (rding via pradeepkth)
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java?rev=883486&view=auto
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java
(added)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileInputLoadFunc.java
Mon Nov 23 20:27:30 2009
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+
+/**
+ * This class provides an implementation of OrderedLoadFunc interface
+ * which can be optionally re-used by LoadFuncs that use FileInputFormat, by
+ * having this as a super class
+ */
+public abstract class FileInputLoadFunc extends OrderedLoadFunc {
+
+ @Override
+ public WritableComparable<?> getSplitComparable(PigSplit split)
+ throws IOException{
+ FileSplit fileSplit = null;
+ if(split.getWrappedSplit() instanceof FileSplit){
+ fileSplit = (FileSplit)split.getWrappedSplit();
+ }else{
+ throw new RuntimeException("LoadFunc expected split of type
FileSplit");
+ }
+
+ return new FileSplitComparable(
+ fileSplit.getPath().toString(),
+ fileSplit.getStart()
+ );
+ }
+
+}
+
+
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java?rev=883486&view=auto
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java
(added)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/FileSplitComparable.java
Mon Nov 23 20:27:30 2009
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pig.data.DataReaderWriter;
+
+
+/**
+ * This class can be used to represent a relative position in a file.
+ * compareTo(other) function of WritaleComparable interface is used to compare
+ * position of different objects of this class.
+ */
+public class FileSplitComparable implements
WritableComparable<FileSplitComparable>, Serializable{
+
+ private static final long serialVersionUID = 1L;
+ protected String filename;
+ protected Long offset;
+
+ //need a default constructor to be able to de-serialize using just
+ // the Writable interface
+ public FileSplitComparable(){}
+
+ public FileSplitComparable(String fileName, long offset){
+ this.filename = fileName;
+ this.offset = offset;
+ }
+
+
+ @Override
+ public int compareTo(FileSplitComparable other) {
+ int rc = filename.compareTo(other.filename);
+ if (rc == 0)
+ rc = Long.signum(offset - other.offset);
+ return rc;
+ }
+
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ filename = (String) DataReaderWriter.readDatum(in);
+ offset = (Long)DataReaderWriter.readDatum(in);
+ }
+
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ DataReaderWriter.writeDatum(out, filename);
+ DataReaderWriter.writeDatum(out, offset);
+ }
+
+ @Override
+ public String toString(){
+ return "FileName: '" + filename + "' Offset: " + offset;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((filename == null) ? 0 : filename.hashCode());
+ result = prime * result + ((offset == null) ? 0 : offset.hashCode());
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ FileSplitComparable other = (FileSplitComparable) obj;
+ if (filename == null) {
+ if (other.filename != null)
+ return false;
+ } else if (!filename.equals(other.filename))
+ return false;
+ if (offset == null) {
+ if (other.offset != null)
+ return false;
+ } else if (!offset.equals(other.offset))
+ return false;
+ return true;
+ }
+}
\ No newline at end of file
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java?rev=883486&view=auto
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java
(added)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/OrderedLoadFunc.java
Mon Nov 23 20:27:30 2009
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+
+/**
+ * Implementing this interface indicates to Pig that a given loader
+ * can be used for MergeJoin. The position as represented by the
+ * WritableComparable object is stored in the index created by
+ * MergeJoin sampling MR job to get an ordered sequence of splits.
+ * This is necessary when the sort key spans multiple splits.
+ */
+public abstract class OrderedLoadFunc extends LoadFunc {
+
+ /**
+ * The WritableComparable object returned will be used to compare
+ * the position of different splits in an ordered stream
+ * @param split
+ * @return WritableComparable representing the position of the split in
input
+ * @throws IOException
+ */
+ public abstract WritableComparable<?> getSplitComparable(PigSplit split)
+ throws IOException;
+
+}
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/SamplableLoader.java
Mon Nov 23 20:27:30 2009
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig;
-
-import java.io.IOException;
-
-import org.apache.pig.data.Tuple;
-import org.apache.pig.LoadFunc;
-
-/**
- * Implementing this interface indicates to Pig that a given loader can be
- * used by a sampling loader. The requirement for this is that the loader
- * can handle a getNext() call without knowing the position in the file.
- * This will not be the case for loaders that handle structured data such
- * as XML where they must start at the beginning of the file in order to
- * understand their position. Record oriented loaders such as PigStorage
- * can handle this by seeking to the next record delimiter and starting
- * from that point. Another requirement is that the loader be able to
- * skip or seek in its input stream.
- */
-public abstract class SamplableLoader extends LoadFunc {
-
- /**
- * Skip ahead in the input stream.
- * @param n number of bytes to skip
- * @return number of bytes actually skipped. The return semantics are
- * exactly the same as {...@link java.io.InpuStream#skip(long)}
- */
- public abstract long skip(long n) throws IOException;
-
- /**
- * Get the current position in the stream.
- * @return position in the stream.
- */
- public abstract long getPosition() throws IOException;
-
- /**
- * Get the next tuple from the stream starting from the current
- * read position.
- * The loader implementation should not assume that current read position
- * in the stream is at the beginning of a record since this method is
called
- * for sampling and the current read position in the stream could be
anywhere
- * in the stream.
- * @return the next tuple from underlying input stream or null if there
are no more tuples
- * to be processed.
- */
- public abstract Tuple getSampledTuple() throws IOException;
-}
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Mon Nov 23 20:27:30 2009
@@ -34,6 +34,7 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
+import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.builtin.BinStorage;
@@ -42,7 +43,6 @@
import org.apache.pig.impl.builtin.DefaultIndexableLoader;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.builtin.PoissonSampleLoader;
-import org.apache.pig.impl.builtin.MergeJoinIndexer;
import org.apache.pig.impl.builtin.GetMemNumRows;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.builtin.RandomSampleLoader;
@@ -56,7 +56,6 @@
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -1183,17 +1182,25 @@
String[] indexerArgs = new String[3];
FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
indexerArgs[0] =
origRightLoaderFileSpec.getFuncSpec().toString();
-
+ if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0])
instanceof OrderedLoadFunc)){
+ int errCode = 1104;
+ String errMsg = "Right input of merge-join must implement
" +
+ "OrderedLoadFunc interface. The specified loader "
+ + indexerArgs[0] + " doesn't implement it";
+ throw new MRCompilerException(errMsg,errCode);
+ }
List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
indexerArgs[1] =
ObjectSerializer.serialize((Serializable)rightInpPlans);
indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
FileSpec lFile = new
FileSpec(rightLoader.getLFile().getFileName(),new
FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
rightLoader.setLFile(lFile);
- // Loader of mro will return a tuple of form (key1, key2,
..,filename, offset)
+ // Loader of mro will return a tuple of form -
+ // (keyFirst1, keyFirst2, .. , position, splitIndex) See
MergeJoinIndexer
// Now set up a POLocalRearrange which has "all" as the key
and tuple fetched
// by loader as the "value" of POLocalRearrange
- // Sorting of index can possibly be achieved by using Hadoop
sorting between map and reduce instead of Pig doing sort. If that is so,
+ // Sorting of index can possibly be achieved by using Hadoop
sorting
+ // between map and reduce instead of Pig doing sort. If that
is so,
// it will simplify lot of the code below.
PhysicalPlan lrPP = new PhysicalPlan();
@@ -1259,11 +1266,12 @@
rightMROpr.setReduceDone(true);
// set up the DefaultIndexableLoader for the join operator
- String[] defaultIndexableLoaderArgs = new String[4];
+ String[] defaultIndexableLoaderArgs = new String[5];
defaultIndexableLoaderArgs[0] =
origRightLoaderFileSpec.getFuncSpec().toString();
defaultIndexableLoaderArgs[1] = strFile.getFileName();
defaultIndexableLoaderArgs[2] =
strFile.getFuncSpec().toString();
defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
+ defaultIndexableLoaderArgs[4] =
origRightLoaderFileSpec.getFileName();
joinOp.setRightLoaderFuncSpec((new
FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java?rev=883486&view=auto
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
(added)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
Mon Nov 23 20:27:30 2009
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.OrderedLoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+/** Merge Join indexer is used to generate on the fly index for doing Merge
Join efficiently.
+ * It samples first record from every block of right side input.
+ * and returns tuple in the following format :
+ * (key0, key1,...,position,splitIndex)
+ * These tuples are then sorted before being written out to index file on
HDFS.
+ */
+public class MergeJoinIndexer extends LoadFunc{
+
+ private boolean firstRec = true;
+ private transient TupleFactory mTupleFactory;
+ private POLocalRearrange lr;
+ private PhysicalPlan precedingPhyPlan;
+ private int keysCnt;
+ private PhysicalOperator rightPipelineLeaf;
+ private PhysicalOperator rightPipelineRoot;
+ private Tuple dummyTuple = null;
+ private OrderedLoadFunc loader;
+ private PigSplit pigSplit = null;
+
+ /** @param funcSpec : Loader specification.
+ * @param innerPlan : This is serialized version of LR plan. We
+ * want to keep only keys in our index file and not the whole tuple. So,
we need LR and thus its plan
+ * to get keys out of the sampled tuple.
+ * @param serializedPhyPlan Serialized physical plan on right side.
+ * @throws ExecException
+ */
+ @SuppressWarnings("unchecked")
+ public MergeJoinIndexer(String funcSpec, String innerPlan, String
serializedPhyPlan) throws ExecException{
+
+ loader = (OrderedLoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
+ try {
+ List<PhysicalPlan> innerPlans =
(List<PhysicalPlan>)ObjectSerializer.deserialize(innerPlan);
+ lr = new POLocalRearrange(new OperatorKey("MergeJoin
Indexer",NodeIdGenerator.getGenerator().getNextNodeId("MergeJoin Indexer")));
+ lr.setPlans(innerPlans);
+ keysCnt = innerPlans.size();
+ precedingPhyPlan =
(PhysicalPlan)ObjectSerializer.deserialize(serializedPhyPlan);
+ if(precedingPhyPlan != null){
+ if(precedingPhyPlan.getLeaves().size() != 1 ||
precedingPhyPlan.getRoots().size() != 1){
+ int errCode = 2168;
+ String errMsg = "Expected physical plan with exactly
one root and one leaf.";
+ throw new
ExecException(errMsg,errCode,PigException.BUG);
+ }
+ this.rightPipelineLeaf = precedingPhyPlan.getLeaves().get(0);
+ this.rightPipelineRoot = precedingPhyPlan.getRoots().get(0);
+ this.rightPipelineRoot.setInputs(null);
+ }
+ }
+ catch (IOException e) {
+ int errCode = 2094;
+ String msg = "Unable to deserialize plans in Indexer.";
+ throw new ExecException(msg,errCode,e);
+ }
+ mTupleFactory = TupleFactory.getInstance();
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+
+ if(!firstRec) // We sample only one record per block.
+ return null;
+ WritableComparable<?> position = loader.getSplitComparable(pigSplit);
+ Object key = null;
+ Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);
+
+ while(true){
+ Tuple readTuple = loader.getNext();
+
+ if(null == readTuple){ // We hit the end.
+
+ for(int i =0; i < keysCnt; i++)
+ wrapperTuple.set(i, null);
+ wrapperTuple.set(keysCnt, position);
+ firstRec = false;
+ return wrapperTuple;
+ }
+
+ if (null == precedingPhyPlan){
+
+ lr.attachInput(readTuple);
+ key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
+ lr.detachInput();
+ if ( null == key) // Tuple with null key. Drop it.
+ continue;
+ break;
+ }
+
+ // There is a physical plan.
+
+ rightPipelineRoot.attachInput(readTuple);
+ boolean fetchNewTup;
+
+ while(true){
+
+ Result res = rightPipelineLeaf.getNext(dummyTuple);
+ switch(res.returnStatus){
+
+ case POStatus.STATUS_OK:
+
+ lr.attachInput((Tuple)res.result);
+ key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
+ lr.detachInput();
+ if ( null == key) // Tuple with null key. Drop it.
+ continue;
+ fetchNewTup = false;
+ break;
+
+ case POStatus.STATUS_EOP:
+ fetchNewTup = true;
+ break;
+
+ default:
+ int errCode = 2164;
+ String errMsg = "Expected EOP/OK as return status. Found:
"+res.returnStatus;
+ throw new ExecException(errMsg,errCode);
+ }
+ break;
+ }
+ if (!fetchNewTup)
+ break;
+ }
+
+ if(key instanceof Tuple){
+ Tuple tupKey = (Tuple)key;
+ for(int i =0; i < tupKey.size(); i++)
+ wrapperTuple.set(i, tupKey.get(i));
+ }
+
+ else
+ wrapperTuple.set(0, key);
+
+ wrapperTuple.set(keysCnt, position);
+ wrapperTuple.set(keysCnt+1, pigSplit.getSplitIndex());
+ firstRec = false;
+ return wrapperTuple;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#getInputFormat()
+ */
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ return loader.getInputFormat();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#getLoadCaster()
+ */
+ @Override
+ public LoadCaster getLoadCaster() throws IOException {
+ return loader.getLoadCaster();
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader,
org.apache.hadoop.mapreduce.InputSplit)
+ */
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) throws
IOException {
+ loader.prepareToRead(reader, split);
+ pigSplit = split;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#setLocation(java.lang.String,
org.apache.hadoop.mapreduce.Job)
+ */
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ loader.setLocation(location, job);
+ }
+}
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
Mon Nov 23 20:27:30 2009
@@ -58,8 +58,6 @@
transient LoadFunc loader = null;
// The filespec on which the operator is based
FileSpec lFile;
- // The stream used to bind to by the loader
- transient InputStream is;
// PigContext passed to us by the operator creator
PigContext pc;
//Indicates whether the loader setup is done or not
@@ -107,13 +105,7 @@
ConfigurationUtil.toConfiguration(pc.getProperties()),
filename,
0);
-
- // XXX : FIXME need to get this to work with new loadfunc interface - the
- // below code is for merge join - hopefully we will no longer need it
- // and then we can just get rid of it and the rest should be fine.
- is = (this.offset == 0) ? FileLocalizer.open(filename, pc) :
FileLocalizer.open(filename, this.offset,pc);
-
-// loader.bindTo(filename , new BufferedPositionedInputStream(is),
this.offset, Long.MAX_VALUE);
+
}
@@ -123,7 +115,6 @@
* @throws IOException
*/
public void tearDown() throws IOException{
- is.close();
setUpDone = false;
}
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
Mon Nov 23 20:27:30 2009
@@ -36,6 +36,7 @@
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
@@ -54,8 +55,9 @@
import org.apache.pig.impl.io.BinStorageRecordWriter;
import org.apache.pig.impl.util.LogUtils;
-public class BinStorage extends LoadFunc
- implements ReversibleLoadStoreFunc, LoadCaster, StoreFunc {
+public class BinStorage extends FileInputLoadFunc
+implements ReversibleLoadStoreFunc, LoadCaster, StoreFunc {
+
public static final int RECORD_1 = 0x01;
public static final int RECORD_2 = 0x02;
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
Mon Nov 23 20:27:30 2009
@@ -37,6 +37,8 @@
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
@@ -55,8 +57,9 @@
* delimiter is given as a regular expression. See String.split(delimiter) and
* http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for
more information.
*/
-public class PigStorage extends LoadFunc
- implements ReversibleLoadStoreFunc, StoreFunc {
+public class PigStorage
+extends FileInputLoadFunc
+implements ReversibleLoadStoreFunc, StoreFunc {
protected RecordReader in = null;
protected RecordWriter writer = null;
protected final Log mLog = LogFactory.getLog(getClass());
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataReaderWriter.java
Mon Nov 23 20:27:30 2009
@@ -20,12 +20,14 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
import org.apache.pig.backend.executionengine.ExecException;
/**
@@ -98,6 +100,30 @@
return new String(ba, DataReaderWriter.UTF8);
}
+ public static Writable bytesToWritable(DataInput in) throws IOException {
+ String className = (String) readDatum(in);
+ // create the writeable class . It needs to have a default constructor
+ Class<?> objClass = null ;
+ try {
+ objClass = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Could not find class " + className +
+ ", while attempting to de-serialize it ", e);
+ }
+ Writable writable = null;
+ try {
+ writable = (Writable) objClass.newInstance();
+ } catch (Exception e) {
+ String msg = "Could create instance of class " + className +
+ ", while attempting to de-serialize it. (no default constructor
?)";
+ throw new IOException(msg, e);
+ }
+
+ //read the fields of the object from DataInput
+ writable.readFields(in);
+ return writable;
+ }
+
public static Object readDatum(DataInput in) throws IOException,
ExecException {
// Read the data type
byte b = in.readByte();
@@ -149,7 +175,10 @@
case DataType.CHARARRAY:
return bytesToCharArray(in);
-
+
+ case DataType.GENERIC_WRITABLECOMPARABLE :
+ return bytesToWritable(in);
+
case DataType.NULL:
return null;
@@ -261,6 +290,13 @@
}
break;
}
+ case DataType.GENERIC_WRITABLECOMPARABLE :
+ out.writeByte(DataType.GENERIC_WRITABLECOMPARABLE);
+ //store the class name, so we know the class to create on read
+ writeDatum(out, val.getClass().getName());
+ Writable writable = (Writable)val;
+ writable.write(out);
+ break;
case DataType.NULL:
out.writeByte(DataType.NULL);
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java
Mon Nov 23 20:27:30 2009
@@ -24,8 +24,8 @@
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
-import java.util.Set;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -62,6 +62,12 @@
public static final byte MAP = 100;
public static final byte TUPLE = 110;
public static final byte BAG = 120;
+
+ // internal use only; used to store WriteableComparable objects
+ // for creating ordered index in MergeJoin. Expecting a object that
+ // implements Writable interface and has default constructor
+ public static final byte GENERIC_WRITABLECOMPARABLE = 123;
+
public static final byte INTERNALMAP = 127; // internal use only; for maps
that are object->object. Used by FindQuantiles.
public static final byte ERROR = -1;
@@ -86,6 +92,7 @@
else if (o instanceof Double) return DOUBLE;
else if (o instanceof Boolean) return BOOLEAN;
else if (o instanceof Byte) return BYTE;
+ else if (o instanceof WritableComparable) return
GENERIC_WRITABLECOMPARABLE;
else {return ERROR;}
}
@@ -135,6 +142,7 @@
} else {
interfaces = ioeInterfaces;
}
+ boolean matchedWritableComparable = false;
for (int i = 0; i < interfaces.length; i++) {
if
(interfaces[i].getName().equals("org.apache.pig.data.Tuple")) {
return TUPLE;
@@ -142,8 +150,13 @@
return BAG;
} else if (interfaces[i].getName().equals("java.util.Map"))
{
return MAP;
- }
+ } else if
(interfaces[i].getName().equals("org.apache.hadoop.io.WritableComparable")) {
+ // use GENERIC_WRITABLECOMPARABLE type only as last
resort
+ matchedWritableComparable = true;
+ }
}
+ if(matchedWritableComparable)
+ return GENERIC_WRITABLECOMPARABLE;
return ERROR;
}
@@ -154,14 +167,19 @@
}
public static byte[] genAllTypes(){
byte[] types = { DataType.BAG, DataType.BIGCHARARRAY,
DataType.BOOLEAN, DataType.BYTE, DataType.BYTEARRAY,
- DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT,
DataType.INTEGER, DataType.INTERNALMAP,
+ DataType.CHARARRAY, DataType.DOUBLE, DataType.FLOAT,
+ DataType.GENERIC_WRITABLECOMPARABLE,
+ DataType.INTEGER, DataType.INTERNALMAP,
DataType.LONG, DataType.MAP, DataType.TUPLE};
return types;
}
private static String[] genAllTypeNames(){
- String[] names = { "BAG", "BIGCHARARRAY", "BOOLEAN", "BYTE",
"BYTEARRAY", "CHARARRAY", "DOUBLE", "FLOAT", "INTEGER",
- "INTERNALMAP", "LONG", "MAP", "TUPLE" };
+ String[] names = { "BAG", "BIGCHARARRAY", "BOOLEAN", "BYTE",
"BYTEARRAY",
+ "CHARARRAY", "DOUBLE", "FLOAT",
+ "GENERIC_WRITABLECOMPARABLE",
+ "INTEGER","INTERNALMAP",
+ "LONG", "MAP", "TUPLE" };
return names;
}
@@ -215,6 +233,7 @@
case INTERNALMAP: return "internalmap";
case TUPLE: return "tuple";
case BAG: return "bag";
+ case GENERIC_WRITABLECOMPARABLE: return "generic_writablecomparable";
default: return "Unknown";
}
}
@@ -253,7 +272,8 @@
(dataType == FLOAT) ||
(dataType == DOUBLE) ||
(dataType == BOOLEAN) ||
- (dataType == BYTE));
+ (dataType == BYTE) ||
+ (dataType == GENERIC_WRITABLECOMPARABLE));
}
/**
@@ -366,6 +386,9 @@
}
}
+ case GENERIC_WRITABLECOMPARABLE:
+ return ((Comparable)o1).compareTo(o2);
+
case INTERNALMAP:
return -1; // Don't think anyway will want to do this.
@@ -374,6 +397,7 @@
case BAG:
return ((DataBag)o1).compareTo((DataBag)o2);
+
default:
throw new RuntimeException("Unkown type " + dt1 +
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
Mon Nov 23 20:27:30 2009
@@ -18,14 +18,10 @@
package org.apache.pig.impl.builtin;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import java.util.zip.GZIPInputStream;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -34,8 +30,8 @@
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
-import org.apache.pig.backend.datastorage.SeekableInputStream;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -44,17 +40,19 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.tools.bzip2r.CBZip2InputStream;
/**
- *
+ * Used by MergeJoin . Takes an index on sorted data
+ * consisting of sorted tuples of the form
+ * (key1,key2..., position,splitIndex) as input. For key given in
seekNear(Tuple)
+ * finds the splitIndex that can contain the key and initializes
ReadToEndLoader
+ * to read from that splitIndex onwards , in the sequence of splits in the
index
*/
-//XXX FIXME - make this work with new load-store redesign
public class DefaultIndexableLoader extends IndexableLoadFunc {
@@ -70,20 +68,26 @@
private String scope;
private Tuple dummyTuple = null;
private transient TupleFactory mTupleFactory;
- private InputStream is;
- private String currentFileName;
+
+ private String inpLocation;
- public DefaultIndexableLoader(String loaderFuncSpec, String indexFile,
String indexFileLoadFuncSpec, String scope) {
+ public DefaultIndexableLoader(
+ String loaderFuncSpec,
+ String indexFile,
+ String indexFileLoadFuncSpec,
+ String scope,
+ String inputLocation
+ ) {
this.rightLoaderFuncSpec = new FuncSpec(loaderFuncSpec);
this.indexFile = indexFile;
this.indexFileLoadFuncSpec = indexFileLoadFuncSpec;
this.scope = scope;
+ this.inpLocation = inputLocation;
}
@SuppressWarnings("unchecked")
@Override
public void seekNear(Tuple keys) throws IOException{
-
// some setup
mTupleFactory = TupleFactory.getInstance();
@@ -100,7 +104,6 @@
// there are multiple Join keys, the tuple itself represents
// the join key
Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
-
POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new
FuncSpec(indexFileLoadFuncSpec)), false);
try {
pc =
(PigContext)ObjectSerializer.deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
@@ -115,6 +118,7 @@
for(Result
res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple))
index.offer((Tuple) res.result);
+
Tuple prevIdxEntry = null;
Tuple matchedEntry;
@@ -137,11 +141,14 @@
}
if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){
+ index.addFirst(curIdxEntry); // We need to add back the
current index Entry because we are reading ahead.
if(null == prevIdxEntry) // very first entry in index.
matchedEntry = curIdxEntry;
else{
- matchedEntry = prevIdxEntry;
- index.addFirst(curIdxEntry); // We need to add back the
current index Entry because we are reading ahead.
+ matchedEntry = prevIdxEntry;
+ // start join from previous idx entry, it might have tuples
+ // with this key
+ index.addFirst(prevIdxEntry);
}
break;
}
@@ -168,29 +175,24 @@
throw new ExecException(errMsg,errCode,PigException.BUG);
}
}
- initRightLoader(matchedEntry);
- }
-
- private void initRightLoader(Tuple idxEntry) throws IOException{
-
- // bind loader to file pointed by this index Entry.
- int keysCnt = idxEntry.size();
- Long offset = (Long)idxEntry.get(keysCnt-1);
- if(offset > 0)
- // Loader will throw away one tuple if we are in the middle of the
block. We don't want that.
- offset -= 1 ;
- FileSpec lFile = new
FileSpec((String)idxEntry.get(keysCnt-2),this.rightLoaderFuncSpec);
- currentFileName = lFile.getFileName();
- loader =
(LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
- is = FileLocalizer.open(currentFileName, offset, pc);
- if (currentFileName.endsWith(".bz") ||
currentFileName.endsWith(".bz2")) {
- is = new CBZip2InputStream((SeekableInputStream)is, 9);
- } else if (currentFileName.endsWith(".gz")) {
- is = new GZIPInputStream(is);
- }
-
-
-// loader.bindTo(currentFileName , new
BufferedPositionedInputStream(is), offset, Long.MAX_VALUE);
+ //add remaining split indexes to splitsAhead array
+ int [] splitsAhead = new int[index.size()];
+ int splitsAheadIdx = 0;
+ for(Tuple t : index){
+ splitsAhead[splitsAheadIdx++] = (Integer) t.get( t.size()-1 );
+ }
+
+ initRightLoader(splitsAhead);
+ }
+
+ private void initRightLoader(int [] splitsToBeRead) throws IOException{
+ //create ReadToEndLoader that will read the given splits in order
+ loader = new ReadToEndLoader(
+
(LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec),
+ ConfigurationUtil.toConfiguration(pc.getProperties()),
+ inpLocation,
+ splitsToBeRead
+ );
}
private Object extractKeysFromIdxTuple(Tuple idxTuple) throws
ExecException{
@@ -200,8 +202,9 @@
if(idxTupSize == 3)
return idxTuple.get(0);
- List<Object> list = new ArrayList<Object>(idxTupSize-2);
- for(int i=0; i<idxTupSize-2;i++)
+ int numColsInKey = (idxTupSize - 2);
+ List<Object> list = new ArrayList<Object>(numColsInKey);
+ for(int i=0; i < numColsInKey; i++)
list.add(idxTuple.get(i));
return mTupleFactory.newTupleNoCopy(list);
@@ -214,27 +217,11 @@
@Override
public Tuple getNext() throws IOException {
Tuple t = loader.getNext();
- if(t == null) {
- while(true){ // But next file may be same
as previous one, because index may contain multiple entries for same file.
- Tuple idxEntry = index.poll();
- if(null == idxEntry) { // Index is finished too.
Right stream is finished. No more tuples.
- return null;
- } else {
-
if(currentFileName.equals((String)idxEntry.get(idxEntry.size()-2))) {
- continue;
- } else {
- initRightLoader(idxEntry); // bind loader to file
and get tuple from it.
- return loader.getNext();
- }
- }
- }
- }
return t;
}
@Override
public void close() throws IOException {
- is.close();
}
@Override
@@ -244,16 +231,23 @@
}
@Override
- public InputFormat getInputFormat() {
- return null;
+ public InputFormat getInputFormat() throws IOException {
+ throw new UnsupportedOperationException();
}
@Override
+ public LoadCaster getLoadCaster() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void prepareToRead(RecordReader reader, PigSplit split) {
+ throw new UnsupportedOperationException();
}
@Override
- public void setLocation(String location, Job job) throws IOException {
+ public void setLocation(String location, Job job) throws IOException {
+ throw new UnsupportedOperationException();
}
}
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=883486&r1=883485&r2=883486&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java
Mon Nov 23 20:27:30 2009
@@ -22,7 +22,6 @@
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -67,13 +66,18 @@
* the input location string (typically input file/dir name )
*/
private String inputLocation;
+
+ /**
+ * If the splits to be read are not in increasing sequence of integers
+ * this array can be used
+ */
+ private int[] toReadSplits = null;
/**
- * the index of the split (in {...@link
InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)})
- * to start reading from
+ * index into toReadSplits
*/
- private int startSplitIndex;
-
+ private int toReadSplitsIdx = 0;
+
/**
* the index of the split the loader is currently reading from
*/
@@ -82,60 +86,86 @@
/**
* the input splits returned by underlying {...@link
InputFormat#getSplits(JobContext)}
*/
- private List<InputSplit> splits;
+ private List<InputSplit> inpSplits = null;
/**
* underlying RecordReader
*/
- private RecordReader reader;
+ private RecordReader reader = null;
/**
* underlying InputFormat
*/
- private InputFormat inputFormat;
+ private InputFormat inputFormat = null;
/**
* @param wrappedLoadFunc
* @param conf
* @param inputLocation
- * @param startSplitIndex
+ * @param splitIndex
* @throws IOException
* @throws InterruptedException
*/
public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
- String inputLocation, int startSplitIndex) throws IOException {
+ String inputLocation, int splitIndex) throws IOException {
this.wrappedLoadFunc = wrappedLoadFunc;
+ this.inputLocation = inputLocation;
+ this.conf = conf;
+ this.curSplitIndex = splitIndex;
+ init();
+ }
+
+ /**
+ * This constructor takes an array of split indexes (toReadSplitIdxs) of
the
+ * splits to be read.
+ * @param wrappedLoadFunc
+ * @param conf
+ * @param inputLocation
+ * @param toReadSplitIdxs
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
+ String inputLocation, int[] toReadSplitIdxs) throws IOException {
+ this.wrappedLoadFunc = wrappedLoadFunc;
+ this.inputLocation = inputLocation;
+ this.toReadSplits = toReadSplitIdxs;
+ this.conf = conf;
+ this.curSplitIndex =
+ toReadSplitIdxs.length > 0 ? toReadSplitIdxs[0] :
Integer.MAX_VALUE;
+ init();
+ }
+
+ private void init() throws IOException {
// make a copy so that if the underlying InputFormat writes to the
// conf, we don't affect the caller's copy
- this.conf = new Configuration(conf);
- this.inputLocation = inputLocation;
- this.startSplitIndex = startSplitIndex;
- this.curSplitIndex = startSplitIndex;
-
+ conf = new Configuration(conf);
// let's initialize the wrappedLoadFunc
- Job job = new Job(this.conf);
- wrappedLoadFunc.setLocation(this.inputLocation,
+ Job job = new Job(conf);
+ wrappedLoadFunc.setLocation(inputLocation,
job);
// The above setLocation call could write to the conf within
// the job - get a hold of the modified conf
- this.conf = job.getConfiguration();
+ conf = job.getConfiguration();
inputFormat = wrappedLoadFunc.getInputFormat();
try {
- splits = inputFormat.getSplits(new JobContext(this.conf,
+ inpSplits = inputFormat.getSplits(new JobContext(conf,
new JobID()));
} catch (InterruptedException e) {
throw new IOException(e);
- }
+ }
}
-
+
private boolean initializeReader() throws IOException,
InterruptedException {
- if(curSplitIndex > splits.size() - 1) {
+ if(curSplitIndex > inpSplits.size() - 1) {
// past the last split, we are done
return false;
}
-
- InputSplit curSplit = splits.get(curSplitIndex);
+ if(reader != null){
+ reader.close();
+ }
+ InputSplit curSplit = inpSplits.get(curSplitIndex);
TaskAttemptContext tAContext = new TaskAttemptContext(conf,
new TaskAttemptID());
reader = inputFormat.createRecordReader(curSplit, tAContext);
@@ -164,8 +194,7 @@
}
// if loadfunc returned null, we need to read next split
// if there is one
- reader.close();
- curSplitIndex++;
+ updateCurSplitIndex();
return getNextHelper();
}
} catch (InterruptedException e) {
@@ -179,22 +208,41 @@
t = wrappedLoadFunc.getNext();
if(t == null) {
// try next split
- curSplitIndex++;
+ updateCurSplitIndex();
} else {
return t;
}
}
return null;
}
+
+
+ /**
+ * Updates curSplitIndex , just increment if splitIndexes is null,
+ * else get next split in splitIndexes
+ */
+ private void updateCurSplitIndex() {
+ if(toReadSplits == null){
+ ++curSplitIndex;
+ }else{
+ ++toReadSplitsIdx;
+ if(toReadSplitsIdx >= toReadSplits.length){
+ // finished all the splits in splitIndexes array
+ curSplitIndex = Integer.MAX_VALUE;
+ }else{
+ curSplitIndex = toReadSplits[toReadSplitsIdx];
+ }
+ }
+ }
@Override
- public InputFormat getInputFormat() {
- return null;
+ public InputFormat getInputFormat() throws IOException {
+ throw new UnsupportedOperationException();
}
@Override
- public LoadCaster getLoadCaster() {
- return null;
+ public LoadCaster getLoadCaster() throws IOException {
+ throw new UnsupportedOperationException();
}
@Override
@@ -206,5 +254,7 @@
public void setLocation(String location, Job job) throws IOException {
throw new UnsupportedOperationException();
}
+
+
}