Author: pradeepkth
Date: Thu Aug 20 18:08:32 2009
New Revision: 806281
URL: http://svn.apache.org/viewvc?rev=806281&view=rev
Log:
PIG-926: Merge-Join phase 2 (ashutoshc via pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Aug 20 18:08:32 2009
@@ -28,6 +28,8 @@
IMPROVEMENTS
+PIG-926: Merge-Join phase 2 (ashutoshc via pradeepkth)
+
PIG-845: PERFORMANCE: Merge Join (ashutoshc via pradeepkth)
PIG-893: Added string -> integer, long, float, and double casts (zjffdu via
gates).
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Thu Aug 20 18:08:32 2009
@@ -34,6 +34,7 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
+import org.apache.pig.SamplableLoader;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
@@ -1065,6 +1066,7 @@
// We will first operate on right side which is indexer job.
// First yank plan of the compiled right input and set that as an
inner plan of right operator.
+ PhysicalPlan rightPipelinePlan;
if(!rightMROpr.mapDone){
PhysicalPlan rightMapPlan = rightMROpr.mapPlan;
if(rightMapPlan.getRoots().size() != 1){
@@ -1082,14 +1084,13 @@
if (rightMapPlan.getSuccessors(rightLoader) == null ||
rightMapPlan.getSuccessors(rightLoader).isEmpty())
// Load - Join case.
- joinOp.setupRightPipeline(null);
+ rightPipelinePlan = null;
else{ // We got something on right side. Yank it and set it as
inner plan of right input.
- PhysicalPlan rightPipelinePlan = rightMapPlan.clone();
+ rightPipelinePlan = rightMapPlan.clone();
PhysicalOperator root =
rightPipelinePlan.getRoots().get(0);
rightPipelinePlan.disconnect(root,
rightPipelinePlan.getSuccessors(root).get(0));
rightPipelinePlan.remove(root);
- joinOp.setupRightPipeline(rightPipelinePlan);
rightMapPlan.trimBelow(rightLoader);
}
}
@@ -1097,12 +1098,12 @@
else if(!rightMROpr.reduceDone){
// Indexer must run in map. If we are in reduce, close it and
start new MROper.
// No need of yanking in this case. Since we are starting
brand new MR Operator and it will contain nothing.
- joinOp.setupRightPipeline(null);
POStore rightStore = getStore();
FileSpec rightStrFile = getTempFileSpec();
rightStore.setSFile(rightStrFile);
rightMROpr.setReduceDone(true);
rightMROpr = startNew(rightStrFile, rightMROpr);
+ rightPipelinePlan = null;
}
else{
@@ -1111,15 +1112,23 @@
throw new PlanException(msg, errCode, PigException.BUG);
}
+ joinOp.setupRightPipeline(rightPipelinePlan);
+
// At this point, we must be operating on map plan of right input
and it would contain nothing else other then a POLoad.
POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
// Replace POLoad with indexer.
- String[] indexerArgs = new String[2];
- indexerArgs[0] = rightLoader.getLFile().getFuncName();
+ String[] indexerArgs = new String[3];
+ indexerArgs[0] = rightLoader.getLFile().getFuncSpec().toString();
+ if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0])
instanceof SamplableLoader)){
+ int errCode = 1104;
+ String errMsg = "Right input of merge-join must implement
SamplableLoader interface. The specified loader " + indexerArgs[0] + " doesn't
implement it";
+ throw new MRCompilerException(errMsg,errCode);
+ }
List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
- indexerArgs[1] =
ObjectSerializer.serialize((Serializable)rightInpPlans);
+ indexerArgs[1] =
ObjectSerializer.serialize((Serializable)rightInpPlans);
+ indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
FileSpec lFile = new
FileSpec(rightLoader.getLFile().getFileName(),new
FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
rightLoader.setLFile(lFile);
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
Thu Aug 20 18:08:32 2009
@@ -32,8 +32,6 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataType;
@@ -49,16 +47,17 @@
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.ObjectSerializer;
+/** This operator implements merge join algorithm to do map side joins.
+ * Currently, only two-way joins are supported. One input of join is
identified as left
+ * and other is identified as right. Left input tuples are the input records
in map.
+ * Right tuples are read from HDFS by opening right stream.
+ *
+ * This join doesn't support outer join.
+ * Data is assumed to be sorted in ascending order. It will fail if data is
sorted in descending order.
+ */
+
public class POMergeJoin extends PhysicalOperator {
- /** This operator implements merge join algorithm to do map side joins.
- * Currently, only two-way joins are supported. One input of join is
identified as left
- * and other is identified as right. Left input tuples are the input
records in map.
- * Right tuples are read from HDFS by opening right stream.
- *
- * This join doesn't support outer join.
- * Data is assumed to be sorted in ascending order. It will fail if
data is sorted in descending order.
- */
private static final long serialVersionUID = 1L;
private final transient Log log = LogFactory.getLog(getClass());
@@ -119,10 +118,6 @@
private int arrayListSize = 1024;
- private List<POCast> casters;
-
- private List<POProject> projectors;
-
/**
* @param k
* @param rp
@@ -140,35 +135,8 @@
mTupleFactory = TupleFactory.getInstance();
leftTuples = new ArrayList<Tuple>(arrayListSize);
this.createJoinPlans(inpPlans,keyTypes);
- setUpTypeCastingForIdxTup(keyTypes.get(0));
}
- /** This function setups casting for key tuples which we read out of index
file.
- * We set the type of key as DataByteArray(DBA) and then cast it into the
type specified in schema.
- * If type is not specified in schema, then we will cast from DBA to DBA.
- */
-
- private void setUpTypeCastingForIdxTup(List<Byte> keyTypes){
- /*
- * Cant reuse one POCast operator for all keys since POCast maintains
some state
- * and hence its not safe to use one POCast. Thus we use one POCast
for each key.
- */
- casters = new ArrayList<POCast>(keyTypes.size());
- projectors = new ArrayList<POProject>(keyTypes.size());
-
- for(Byte keytype : keyTypes){
- POCast caster = new POCast(genKey());
- List<PhysicalOperator> pp = new ArrayList<PhysicalOperator>(1);
- POProject projector = new POProject(genKey());
- projector.setResultType(DataType.BYTEARRAY);
- projector.setColumn(0);
- pp.add(projector);
- caster.setInputs(pp);
- caster.setResultType(keytype);
- projectors.add(projector);
- casters.add(caster);
- }
- }
/**
* Configures the Local Rearrange operators to get keys out of tuple.
* @throws ExecException
@@ -504,50 +472,15 @@
private Object extractKeysFromIdxTuple(Tuple idxTuple) throws
ExecException{
int idxTupSize = idxTuple.size();
- List<Object> list = new ArrayList<Object>(idxTupSize-2);
-
- for(int i=0; i<idxTupSize-2; i++){
-
-
projectors.get(i).attachInput(mTupleFactory.newTuple(idxTuple.get(i)));
- switch (casters.get(i).getResultType()) {
-
- case DataType.BYTEARRAY: // POCast doesn't handle DBA. But we
are saved, because in this case we don't need cast anyway.
- list.add(idxTuple.get(i));
- break;
-
- case DataType.CHARARRAY:
- list.add(casters.get(i).getNext(dummyString).result);
- break;
-
- case DataType.INTEGER:
- list.add(casters.get(i).getNext(dummyInt).result);
- break;
-
- case DataType.FLOAT:
- list.add(casters.get(i).getNext(dummyFloat).result);
- break;
-
- case DataType.DOUBLE:
- list.add(casters.get(i).getNext(dummyDouble).result);
- break;
-
- case DataType.LONG:
- list.add(casters.get(i).getNext(dummyLong).result);
- break;
-
- case DataType.TUPLE:
- list.add(casters.get(i).getNext(dummyTuple).result);
- break;
+ if(idxTupSize == 3)
+ return idxTuple.get(0);
+
+ List<Object> list = new ArrayList<Object>(idxTupSize-2);
+ for(int i=0; i<idxTupSize-2;i++)
+ list.add(idxTuple.get(i));
- default:
- int errCode = 2036;
- String errMsg = "Unhandled key type :
"+casters.get(i).getResultType();
- throw new ExecException(errMsg,errCode,PigException.BUG);
- }
- }
- // If there is only one key, we don't want to wrap it into Tuple.
- return list.size() == 1 ? list.get(0) : mTupleFactory.newTuple(list);
+ return mTupleFactory.newTupleNoCopy(list);
}
private Result getNextRightInp() throws ExecException{
@@ -623,7 +556,11 @@
// bind loader to file pointed by this index Entry.
int keysCnt = idxEntry.size();
- rightLoader = new POLoad(genKey(), new
FileSpec((String)idxEntry.get(keysCnt-2),this.rightLoaderFuncSpec),(Long)idxEntry.get(keysCnt-1),
false);
+ Long offset = (Long)idxEntry.get(keysCnt-1);
+ if(offset > 0)
+ // Loader will throw away one tuple if we are in the middle of the
block. We don't want that.
+ offset -= 1 ;
+ rightLoader = new POLoad(genKey(), new
FileSpec((String)idxEntry.get(keysCnt-2),this.rightLoaderFuncSpec),offset,
false);
rightLoader.setPc(pc);
}
@@ -673,8 +610,6 @@
public void setRightLoaderFuncSpec(FuncSpec rightLoaderFuncSpec) {
this.rightLoaderFuncSpec = rightLoaderFuncSpec;
- for(POCast caster : casters)
- caster.setLoadFSpec(rightLoaderFuncSpec);
}
public List<PhysicalPlan> getInnerPlansOf(int index) {
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java Thu
Aug 20 18:08:32 2009
@@ -18,119 +18,216 @@
package org.apache.pig.impl.builtin;
import java.io.IOException;
-import java.io.ObjectInputStream;
import java.util.List;
+import java.util.Map;
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
+import org.apache.pig.SamplableLoader;
+import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.data.DataType;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.util.ObjectSerializer;
-public class MergeJoinIndexer extends RandomSampleLoader {
+/** Merge Join indexer is used to generate on the fly index for doing Merge
Join efficiently.
+ * It samples first record from every block of right side input.
+ * and returns tuple in the following format :
+ * (key0, key1,...,fileName, offset)
+ * These tuples are then sorted before being written out to index file on
HDFS.
+ */
+
+public class MergeJoinIndexer implements LoadFunc{
- /** Merge Join indexer is used to generate on the fly index for doing
Merge Join efficiently.
- * It samples first record from every block of right side input (which is
later opened as side file in merge join)
- * and returns tuple in the following format :
- * (key0, key1,...,fileName, offset)
- * These tuples are then sorted before being written out to index file on
HDFS.
- */
-
private boolean firstRec = true;
private transient TupleFactory mTupleFactory;
private String fileName;
private POLocalRearrange lr;
+ private PhysicalPlan precedingPhyPlan;
private int keysCnt;
-
+ private PhysicalOperator rightPipelineLeaf;
+ private PhysicalOperator rightPipelineRoot;
+ private Tuple dummyTuple = null;
+ private SamplableLoader loader;
+
/** @param funcSpec : Loader specification.
- * @param serializedPlans : This is serialized version of LR plan. We
+ * @param innerPlan : This is serialized version of LR plan. We
* want to keep only keys in our index file and not the whole tuple. So,
we need LR and thus its plan
* to get keys out of the sampled tuple.
+ * @param serializedPhyPlan Serialized physical plan on right side.
+ * @throws ExecException
*/
@SuppressWarnings("unchecked")
- public MergeJoinIndexer(String funcSpec, String serializedPlans) throws
ExecException{
- super(funcSpec,"1");
-
+ public MergeJoinIndexer(String funcSpec, String innerPlan, String
serializedPhyPlan) throws ExecException{
+
+ loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
try {
- List<PhysicalPlan> innerPlans =
(List<PhysicalPlan>)ObjectSerializer.deserialize(serializedPlans);
+ List<PhysicalPlan> innerPlans =
(List<PhysicalPlan>)ObjectSerializer.deserialize(innerPlan);
lr = new POLocalRearrange(new OperatorKey("MergeJoin
Indexer",NodeIdGenerator.getGenerator().getNextNodeId("MergeJoin Indexer")));
lr.setPlans(innerPlans);
keysCnt = innerPlans.size();
- mTupleFactory = TupleFactory.getInstance();
- }
- catch (PlanException pe) {
- int errCode = 2071;
- String msg = "Problem with setting up local rearrange's plans.";
- throw new ExecException(msg, errCode, PigException.BUG, pe);
+ precedingPhyPlan =
(PhysicalPlan)ObjectSerializer.deserialize(serializedPhyPlan);
+ if(precedingPhyPlan != null){
+ if(precedingPhyPlan.getLeaves().size() != 1 ||
precedingPhyPlan.getRoots().size() != 1){
+ int errCode = 2168;
+ String errMsg = "Expected physical plan with exactly
one root and one leaf.";
+ throw new
ExecException(errMsg,errCode,PigException.BUG);
+ }
+ this.rightPipelineLeaf = precedingPhyPlan.getLeaves().get(0);
+ this.rightPipelineRoot = precedingPhyPlan.getRoots().get(0);
+ this.rightPipelineRoot.setInputs(null);
+ }
}
catch (IOException e) {
int errCode = 2094;
- String msg = "Unable to deserialize inner plans in Indexer.";
+ String msg = "Unable to deserialize plans in Indexer.";
throw new ExecException(msg,errCode,e);
}
+ mTupleFactory = TupleFactory.getInstance();
}
@Override
public void bindTo(String fileName, BufferedPositionedInputStream is,long
offset, long end) throws IOException {
this.fileName = fileName;
- super.bindTo(fileName, is, offset, end);
+ loader.bindTo(fileName, is, offset, end);
}
@Override
public Tuple getNext() throws IOException {
- if(!firstRec) // We sample only record per block.
+ if(!firstRec) // We sample only one record per block.
return null;
+ long curPos;
+ Object key = null;
+ Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);
+
while(true){
- long initialPos = loader.getPosition();
- Tuple t = loader.getSampledTuple();
-
- if(null == t){ // We hit the end of block because all
keys are null.
-
- Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);
+ curPos = loader.getPosition();
+ Tuple readTuple = loader.getNext();
+
+ if(null == readTuple){ // We hit the end.
+
for(int i =0; i < keysCnt; i++)
wrapperTuple.set(i, null);
wrapperTuple.set(keysCnt, fileName);
- wrapperTuple.set(keysCnt+1, initialPos);
+ wrapperTuple.set(keysCnt+1, curPos);
firstRec = false;
return wrapperTuple;
}
-
- Tuple dummyTuple = null;
- lr.attachInput(t);
- Object key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
- if(null == key) // Tuple with null key. Drop it. Get next.
- continue;
-
- Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);
- if(key instanceof Tuple){
- Tuple tupKey = (Tuple)key;
- for(int i =0; i < tupKey.size(); i++)
- wrapperTuple.set(i, tupKey.get(i));
+
+ if (null == precedingPhyPlan){
+
+ lr.attachInput(readTuple);
+ key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
+ lr.detachInput();
+ if ( null == key) // Tuple with null key. Drop it.
+ continue;
+ break;
}
- else
- wrapperTuple.set(0, key);
+ // There is a physical plan.
- lr.detachInput();
- wrapperTuple.set(keysCnt, fileName);
- wrapperTuple.set(keysCnt+1, initialPos);
+ rightPipelineRoot.attachInput(readTuple);
+ boolean fetchNewTup;
- firstRec = false;
- return wrapperTuple;
+ while(true){
+
+ Result res = rightPipelineLeaf.getNext(dummyTuple);
+ switch(res.returnStatus){
+
+ case POStatus.STATUS_OK:
+
+ lr.attachInput((Tuple)res.result);
+ key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
+ lr.detachInput();
+ if ( null == key) // Tuple with null key. Drop it.
+ continue;
+ fetchNewTup = false;
+ break;
+
+ case POStatus.STATUS_EOP:
+ fetchNewTup = true;
+ break;
+
+ default:
+ int errCode = 2164;
+ String errMsg = "Expected EOP/OK as return status. Found:
"+res.returnStatus;
+ throw new ExecException(errMsg,errCode);
+ }
+ break;
+ }
+ if (!fetchNewTup)
+ break;
}
+
+ if(key instanceof Tuple){
+ Tuple tupKey = (Tuple)key;
+ for(int i =0; i < tupKey.size(); i++)
+ wrapperTuple.set(i, tupKey.get(i));
+ }
+
+ else
+ wrapperTuple.set(0, key);
+
+ wrapperTuple.set(keysCnt, fileName);
+ wrapperTuple.set(keysCnt+1, curPos);
+ firstRec = false;
+ return wrapperTuple;
+ }
+
+ public Integer bytesToInteger(byte[] b) throws IOException {
+ return loader.bytesToInteger(b);
}
- private void readObject(ObjectInputStream is) throws IOException,
ClassNotFoundException, ExecException{
- is.defaultReadObject();
- mTupleFactory = TupleFactory.getInstance();
+ public Long bytesToLong(byte[] b) throws IOException {
+ return loader.bytesToLong(b);
+ }
+
+ public Float bytesToFloat(byte[] b) throws IOException {
+ return loader.bytesToFloat(b);
+ }
+
+ public Double bytesToDouble(byte[] b) throws IOException {
+ return loader.bytesToDouble(b);
+ }
+
+ public String bytesToCharArray(byte[] b) throws IOException {
+ return loader.bytesToCharArray(b);
+ }
+
+ public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+ return loader.bytesToMap(b);
+ }
+
+ public Tuple bytesToTuple(byte[] b) throws IOException {
+ return loader.bytesToTuple(b);
+ }
+
+ public DataBag bytesToBag(byte[] b) throws IOException {
+ return loader.bytesToBag(b);
+ }
+
+ public void fieldsToRead(Schema schema) {
+ loader.fieldsToRead(schema);
+ }
+
+ public Schema determineSchema(
+ String fileName,
+ ExecType execType,
+ DataStorage storage) throws IOException {
+ return loader.determineSchema(fileName, execType, storage);
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Thu Aug 20
18:08:32 2009
@@ -407,6 +407,37 @@
}
@Test
+ public void testIndexer() throws IOException{
+ Util.createInputFile(cluster, "temp_file1", new String[]{1+""});
+ Util.createInputFile(cluster, "temp_file2", new String[]{2+""});
+ Util.createInputFile(cluster, "temp_file3", new String[]{10+""});
+ pigServer.registerQuery("A = LOAD 'temp_file*' as (a:int);");
+ pigServer.registerQuery("B = LOAD 'temp_file*' as (a:int);");
+ DataBag dbmrj = BagFactory.getInstance().newDefaultBag(), dbshj =
BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by $0, B by $0 using
\"merge\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbmrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by $0, B by $0;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Util.deleteFile(cluster, "temp_file1");
+ Util.deleteFile(cluster, "temp_file2");
+ Util.deleteFile(cluster, "temp_file3");
+ Assert.assertEquals(dbmrj.size(),dbshj.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbmrj, dbshj));
+ }
+
+ @Test
public void testMergeJoinSch1() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as
(x:int,y:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as
(x:int,y:int);");
Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld?rev=806281&r1=806280&r2=806281&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld Thu
Aug 20 18:08:32 2009
@@ -1,23 +1,23 @@
-MapReduce(-1,PigStorage) - scope-127:
+MapReduce(-1,PigStorage) - scope-125:
Reduce Plan Empty
-| Store(file:/tmp:org.apache.pig.builtin.PigStorage) - scope-126
+| Store(file:/tmp:org.apache.pig.builtin.PigStorage) - scope-124
| |
| |---MergeJoin[tuple] - scope-121
| |
| |---Load(file:/tmp/input1:org.apache.pig.builtin.PigStorage) -
scope-117
|
-|---MapReduce(-1,PigStorage) - scope-128:
- |
Store(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage)
- scope-135
+|---MapReduce(-1,PigStorage) - scope-126:
+ |
Store(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage)
- scope-133
| |
- | |---POSort[tuple]() - scope-134
+ | |---POSort[tuple]() - scope-132
| | |
- | | Project[tuple][*] - scope-133
+ | | Project[tuple][*] - scope-131
| |
- | |---Project[tuple][1] - scope-132
+ | |---Project[tuple][1] - scope-130
| |
- | |---Package[tuple]{chararray} - scope-131
- | Local Rearrange[tuple]{chararray}(false) - scope-130
+ | |---Package[tuple]{chararray} - scope-129
+ | Local Rearrange[tuple]{chararray}(false) - scope-128
| | |
- | | Constant(all) - scope-129
+ | | Constant(all) - scope-127
| |
- |
|---Load(file:/tmp/input2:org.apache.pig.impl.builtin.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','kmonaaafhdhcaabdgkgbhggbcohfhegjgmcoebhchcgbhjemgjhdhehiibncbnjjmhgbjnadaaabejaaaehdgjhkgfhihaaaaaaaabhhaeaaaaaaabhdhcaaeogphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccohagmgbgohdcofagihjhdgjgdgbgmfagmgbgoaaaaaaaaaaaaaaabacaaabfkaaangfgogeepggebgmgmejgohahfhehihcaacfgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcfagmgbgoepcpjdmpihcaifknacaaagemaaakgneghcgpgnefgeghgfhdheaacdemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphfhegjgmcpenhfgmhegjengbhadlemaaafgnelgfhjhdheaaapemgkgbhggbcphfhegjgmcpengbhadlemaaahgnemgfgbhggfhdheaabaemgkgbhggbcphfhegjgmcpemgjhdhedlemaaaegnephahdhbaahoaaafemaaaggnfcgpgphehdhbaahoaaagemaaaignfegpefgeghgfhdhbaahoaaaehihahdhcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohfhegjgmcoenhfgmhegjengbhaaaaaaaaaaaaaaaacacaaabemaaaegnengbhahbaahoaaafhihahdhcaabbgkgbh
ggbcohfhegjgmcoeigbhdgiengbhaafahnkmbmdbgganbadaaacegaaakgmgpgbgeeggbgdhegphcejaaajhegihcgfhdgigpgmgehihadpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhdhcaacegphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcelgfhjaaaaaaaaaaaaaaabacaaacekaaacgjgeemaaafhdgdgphagfheaabcemgkgbhggbcpgmgbgoghcpfdhehcgjgoghdlhihaaaaaaaaaaaaaaahiheaaafhdgdgphagfhdhcaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdheaaaaaaaaaaaaaaabacaaagfkaaakgphggfhcgmgpgbgegfgefkaabfhahcgpgdgfhdhdgjgoghecgbghepggfehfhagmgfhdfkaabehcgfhdhfgmhefdgjgoghgmgffehfhagmgfecgbghfkaaaehdhegbhcemaaalgcgbghejhegfhcgbhegphcheaabeemgkgbhggbcphfhegjgmcpejhegfhcgbhegphcdlemaaahgdgpgmhfgngohdheaabfemgkgbhggbcphfhegjgmcpebhchcgbhjemgjhdhedlhihcaagcgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhd
gjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcoefhihahcgfhdhdgjgpgoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaadgmgpghheaacaemgphcghcpgbhagbgdgigfcpgdgpgngngpgohdcpgmgpghghgjgoghcpemgpghdlhihcaaemgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofagihjhdgjgdgbgmephagfhcgbhegphcaaaaaaaaaaaaaaabacaaakfkaaangjgohahfheebhehegbgdgigfgeejaabehcgfhbhfgfhdhegfgefagbhcgbgmgmgfgmgjhdgnecaaakhcgfhdhfgmhefehjhagfemaaafgjgohahfheheaablemgphcghcpgbhagbgdgigfcphagjghcpgegbhegbcpfehfhagmgfdlemaaaggjgohahfhehdhbaahoaaagemaaangmgjgogfgbghgffehcgbgdgfhcheaachemgphcghcpgbhagbgdgigfcphagjghcphagfgocphfhegjgmcpemgjgogfgbghgffehcgbgdgfhcdlemaaadgmgpghhbaahoaabfemaaahgphfhehahfhehdhbaahoaaagemaaakhagbhcgfgohefagmgbgoheaafaemgphcghcpgbhagbgdgigfcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccphagmgbgohdcpfagihjhdgjgdgbgmfagmgbgodlemaaadhcgfhdheaaeeemgphcghcpgbhagbgdgig
fcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccpfcgfhdhfgmhedlhihcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaaegnelgfhjheaacgemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphagmgbgocpephagfhcgbhegphcelgfhjdlhihahbaahoaaapaappppppppdchahahahdhcaaclgphcghcogbhagbgdgigfcogdgpgngngpgohdcogmgpghghgjgoghcogjgnhagmcoemgpghdeekemgpghghgfhccikmpnoicknfncdiacaaabemaaaegogbgngfhbaahoaaaohihaheaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdhehahahdhcaaecgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofcgfhdhfgmheaaaaaaaaaaaaaaabacaaacecaaamhcgfhehfhcgofdhegbhehfhdemaaaghcgfhdhfgmheheaabcemgkgbhggbcpgmgbgoghcpepgcgkgfgdhedlhihaachahbaahoaabpaaaaaaaahahdhbaahoaaaaaaaaaaabhhaeaaaaaaabhdhcaa
bbgkgbhggbcogmgbgoghcoejgohegfghgfhcbcockakephibihdiacaaabejaaafhggbgmhfgfhihcaabagkgbhggbcogmgbgoghcoeohfgngcgfhcigkmjfbnaljeoailacaaaahihaaaaaaaaahihihdhbaahoaaaaaaaaaaabhhaeaaaaaaakhbaahoaabnhihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhbaahoaabnhbaahoaaaphihdhbaahoaaaaaaaaaaaahhaeaaaaaaakhihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahiaahi'))
- scope-118
\ No newline at end of file
+ |
|---Load(file:/tmp/input2:org.apache.pig.impl.builtin.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','kmonaaafhdhcaabdgkgbhggbcohfhegjgmcoebhchcgbhjemgjhdhehiibncbnjjmhgbjnadaaabejaaaehdgjhkgfhihaaaaaaaabhhaeaaaaaaabhdhcaaeogphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccohagmgbgohdcofagihjhdgjgdgbgmfagmgbgoaaaaaaaaaaaaaaabacaaabfkaaangfgogeepggebgmgmejgohahfhehihcaacfgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcfagmgbgoepcpjdmpihcaifknacaaagemaaakgneghcgpgnefgeghgfhdheaacdemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphfhegjgmcpenhfgmhegjengbhadlemaaafgnelgfhjhdheaaapemgkgbhggbcphfhegjgmcpengbhadlemaaahgnemgfgbhggfhdheaabaemgkgbhggbcphfhegjgmcpemgjhdhedlemaaaegnephahdhbaahoaaafemaaaggnfcgpgphehdhbaahoaaagemaaaignfegpefgeghgfhdhbaahoaaaehihahdhcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohfhegjgmcoenhfgmhegjengbhaaaaaaaaaaaaaaaacacaaabemaaaegnengbhahbaahoaaafhihahdhcaabbgkgbh
ggbcohfhegjgmcoeigbhdgiengbhaafahnkmbmdbgganbadaaacegaaakgmgpgbgeeggbgdhegphcejaaajhegihcgfhdgigpgmgehihadpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhdhcaacegphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcelgfhjaaaaaaaaaaaaaaabacaaacekaaacgjgeemaaafhdgdgphagfheaabcemgkgbhggbcpgmgbgoghcpfdhehcgjgoghdlhihaaaaaaaaaaaaaaahiheaaafhdgdgphagfhdhcaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdheaaaaaaaaaaaaaaabacaaagfkaaakgphggfhcgmgpgbgegfgefkaabfhahcgpgdgfhdhdgjgoghecgbghepggfehfhagmgfhdfkaabehcgfhdhfgmhefdgjgoghgmgffehfhagmgfecgbghfkaaaehdhegbhcemaaalgcgbghejhegfhcgbhegphcheaabeemgkgbhggbcphfhegjgmcpejhegfhcgbhegphcdlemaaahgdgpgmhfgngohdheaabfemgkgbhggbcphfhegjgmcpebhchcgbhjemgjhdhedlhihcaagcgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhd
gjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcoefhihahcgfhdhdgjgpgoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaadgmgpghheaacaemgphcghcpgbhagbgdgigfcpgdgpgngngpgohdcpgmgpghghgjgoghcpemgpghdlhihcaaemgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofagihjhdgjgdgbgmephagfhcgbhegphcaaaaaaaaaaaaaaabacaaakfkaaangjgohahfheebhehegbgdgigfgeejaabehcgfhbhfgfhdhegfgefagbhcgbgmgmgfgmgjhdgnecaaakhcgfhdhfgmhefehjhagfemaaafgjgohahfheheaablemgphcghcpgbhagbgdgigfcphagjghcpgegbhegbcpfehfhagmgfdlemaaaggjgohahfhehdhbaahoaaagemaaangmgjgogfgbghgffehcgbgdgfhcheaachemgphcghcpgbhagbgdgigfcphagjghcphagfgocphfhegjgmcpemgjgogfgbghgffehcgbgdgfhcdlemaaadgmgpghhbaahoaabfemaaahgphfhehahfhehdhbaahoaaagemaaakhagbhcgfgohefagmgbgoheaafaemgphcghcpgbhagbgdgigfcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccphagmgbgohdcpfagihjhdgjgdgbgmfagmgbgodlemaaadhcgfhdheaaeeemgphcghcpgbhagbgdgig
fcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccpfcgfhdhfgmhedlhihcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaaegnelgfhjheaacgemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphagmgbgocpephagfhcgbhegphcelgfhjdlhihahbaahoaaapaappppppppdchahahahdhcaaclgphcghcogbhagbgdgigfcogdgpgngngpgohdcogmgpghghgjgoghcogjgnhagmcoemgpghdeekemgpghghgfhccikmpnoicknfncdiacaaabemaaaegogbgngfhbaahoaaaohihaheaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdhehahahdhcaaecgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofcgfhdhfgmheaaaaaaaaaaaaaaabacaaacecaaamhcgfhehfhcgofdhegbhehfhdemaaaghcgfhdhfgmheheaabcemgkgbhggbcpgmgbgoghcpepgcgkgfgdhedlhihaachahbaahoaabpaaaaaaaahahdhbaahoaaaaaaaaaaabhhaeaaaaaaabhdhcaa
bbgkgbhggbcogmgbgoghcoejgohegfghgfhcbcockakephibihdiacaaabejaaafhggbgmhfgfhihcaabagkgbhggbcogmgbgoghcoeohfgngcgfhcigkmjfbnaljeoailacaaaahihaaaaaaaaahihihdhbaahoaaaaaaaaaaabhhaeaaaaaaakhbaahoaabnhihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhbaahoaabnhbaahoaaaphihdhbaahoaaaaaaaaaaaahhaeaaaaaaakhihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahiaahi',''))
- scope-118
\ No newline at end of file