Author: pradeepkth
Date: Tue Nov 3 23:27:41 2009
New Revision: 832599
URL: http://svn.apache.org/viewvc?rev=832599&view=rev
Log:
PIG-1036: Fragment-replicate left outer join (ankit.modi via pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java
hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Nov 3 23:27:41 2009
@@ -26,6 +26,8 @@
IMPROVEMENTS
+PIG-1036: Fragment-replicate left outer join (ankit.modi via pradeepkth)
+
PIG-920: optimizing diamond queries (rding via pradeepkth)
PIG-1040: FINDBUGS: MS_SHOULD_BE_FINAL: Field isn't final but should be
(olgan)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
Tue Nov 3 23:27:41 2009
@@ -880,15 +880,49 @@
}
}
logToPhyMap.put(loj, skj);
- }
-
+ }
else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
int fragment = 0;
POFRJoin pfrj;
try {
+ boolean []innerFlags = loj.getInnerFlags();
+ boolean isLeftOuter = false;
+ // We dont check for bounds issue as we assume that a join
+ // involves atleast two inputs
+ isLeftOuter = !innerFlags[1];
+
+ Tuple nullTuple = null;
+ if( isLeftOuter ) {
+ try {
+ // We know that in a Left outer join its only a two
way
+ // join, so we assume index of 1 for the right
input
+ Schema inputSchema = inputs.get(1).getSchema();
+
+ // We check if we have a schema before the join
+ if(inputSchema == null) {
+ int errCode = 1109;
+ String msg = "Input (" +
inputs.get(1).getAlias() + ") " +
+ "on which outer join is desired should have a
valid schema";
+ throw new
LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
+ }
+
+ // Using the schema we decide the number of
columns/fields
+ // in the nullTuple
+ nullTuple =
TupleFactory.getInstance().newTuple(inputSchema.size());
+ for(int j = 0; j < inputSchema.size(); j++) {
+ nullTuple.set(j, null);
+ }
+
+ } catch( FrontendException e ) {
+ int errCode = 2104;
+ String msg = "Error while determining the schema of
input";
+ throw new LogicalToPhysicalTranslatorException(msg,
errCode, PigException.BUG, e);
+ }
+ }
+
pfrj = new POFRJoin(new
OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(),
- inp, ppLists, keyTypes, null,
fragment);
+ inp, ppLists, keyTypes, null,
fragment, isLeftOuter, nullTuple);
} catch (ExecException e1) {
int errCode = 2058;
String msg = "Unable to set index on newly create
POLocalRearrange.";
@@ -1073,13 +1107,13 @@
if(inputSchema == null) {
- int errCode = 1105;
+ int errCode = 1109;
String msg = "Input (" + joinInput.getAlias() + ") " +
"on which outer join is desired should have a valid
schema";
throw new LogicalToPhysicalTranslatorException(msg, errCode,
PigException.INPUT);
}
} catch (FrontendException e) {
- int errCode = 2014;
+ int errCode = 2104;
String msg = "Error while determining the schema of input";
throw new LogicalToPhysicalTranslatorException(msg, errCode,
PigException.BUG, e);
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
Tue Nov 3 23:27:41 2009
@@ -24,7 +24,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
@@ -39,6 +38,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -49,19 +49,18 @@
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
-
/**
- * The operator models the join keys using the Local Rearrange operators which
- * are configured with the plan specified by the user. It also sets up
- * one Hashtable per replicated input which maps the Key(k) stored as a Tuple
- * to a DataBag which holds all the values in the input having the same key(k)
- * The getNext() reads an input from its predecessor and separates them into
- * key & value. It configures a foreach operator with the databags obtained
from
- * each Hashtable for the key and also with the value for the fragment input.
- * It then returns tuples returned by this foreach operator.
+ * The operator models the join keys using the Local Rearrange operators which
+ * are configured with the plan specified by the user. It also sets up one
+ * Hashtable per replicated input which maps the Key(k) stored as a Tuple to a
+ * DataBag which holds all the values in the input having the same key(k) The
+ * getNext() reads an input from its predecessor and separates them into key &
+ * value. It configures a foreach operator with the databags obtained from each
+ * Hashtable for the key and also with the value for the fragment input. It
then
+ * returns tuples returned by this foreach operator.
*/
-//We intentionally skip type checking in backend for performance reasons
+// We intentionally skip type checking in backend for performance reasons
@SuppressWarnings("unchecked")
public class POFRJoin extends PhysicalOperator {
/**
@@ -69,35 +68,48 @@
*/
private static final long serialVersionUID = 1L;
static private Log log = LogFactory.getLog(POFRJoin.class);
- //The number in the input list which denotes the fragmented input
+ // The number in the input list which denotes the fragmented input
private int fragment;
- //There can be n inputs each being a List<PhysicalPlan>
- //Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
+ // There can be n inputs each being a List<PhysicalPlan>
+ // Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
private List<List<PhysicalPlan>> phyPlanLists;
- //The key type for each Local Rearrange operator
+ // The key type for each Local Rearrange operator
private List<List<Byte>> keyTypes;
- //The Local Rearrange operators modeling the join key
+ // The Local Rearrange operators modeling the join key
private POLocalRearrange[] LRs;
- //The set of files that represent the replicated inputs
+ // The set of files that represent the replicated inputs
private FileSpec[] replFiles;
- //Used to configure the foreach operator
+ // Used to configure the foreach operator
private ConstantExpression[] constExps;
- //Used to produce the cross product of various bags
+ // Used to produce the cross product of various bags
private POForEach fe;
- //The array of Hashtables one per replicated input. replicates[fragment] =
null
- private Map<Tuple,List<Tuple>> replicates[];
- //varaible which denotes whether we are returning tuples from the foreach
operator
+ // The array of Hashtables one per replicated input. replicates[fragment] =
+ // null
+ // fragment is the input which is fragmented and not replicated.
+ private Map<Tuple, List<Tuple>> replicates[];
+ // varaible which denotes whether we are returning tuples from the foreach
+ // operator
private boolean processingPlan;
- //A dummy tuple
+ // A dummy tuple
private Tuple dumTup = TupleFactory.getInstance().newTuple(1);
- //An instance of tuple factory
+ // An instance of tuple factory
private transient TupleFactory mTupleFactory;
private transient BagFactory mBagFactory;
private boolean setUp;
-
- public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes, FileSpec[]
replFiles, int fragment) throws ExecException{
- super(k,rp,inp);
-
+ // A Boolean variable which denotes if this is a LeftOuter Join or an Inner
+ // Join
+ private boolean isLeftOuterJoin;
+
+ // This list contains nullTuples according to schema of various inputs
+ private DataBag nullBag;
+
+ public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
+ List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes,
+ FileSpec[] replFiles, int fragment, boolean isLeftOuter,
+ Tuple nullTuple)
+ throws ExecException {
+ super(k, rp, inp);
+
phyPlanLists = ppLists;
this.fragment = fragment;
this.keyTypes = keyTypes;
@@ -109,32 +121,39 @@
processingPlan = false;
mTupleFactory = TupleFactory.getInstance();
mBagFactory = BagFactory.getInstance();
+ List<Tuple> tupList = new ArrayList<Tuple>();
+ tupList.add(nullTuple);
+ nullBag = mBagFactory.newDefaultBag(tupList);
+ this.isLeftOuterJoin = isLeftOuter;
}
-
- public List<List<PhysicalPlan>> getJoinPlans(){
+
+ public List<List<PhysicalPlan>> getJoinPlans() {
return phyPlanLists;
}
-
- private OperatorKey genKey(OperatorKey old){
- return new
OperatorKey(old.scope,NodeIdGenerator.getGenerator().getNextNodeId(old.scope));
+
+ private OperatorKey genKey(OperatorKey old) {
+ return new OperatorKey(old.scope, NodeIdGenerator.getGenerator()
+ .getNextNodeId(old.scope));
}
-
+
/**
* Configures the Local Rearrange operators & the foreach operator
+ *
* @param old
- * @throws ExecException
+ * @throws ExecException
*/
- private void createJoinPlans(OperatorKey old) throws ExecException{
+ private void createJoinPlans(OperatorKey old) throws ExecException {
List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
List<Boolean> flatList = new ArrayList<Boolean>();
-
- int i=-1;
+
+ int i = -1;
for (List<PhysicalPlan> ppLst : phyPlanLists) {
++i;
POLocalRearrange lr = new POLocalRearrange(genKey(old));
lr.setIndex(i);
lr.setResultType(DataType.TUPLE);
- lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE :
keyTypes.get(i).get(0));
+ lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE
+ : keyTypes.get(i).get(0));
try {
lr.setPlans(ppLst);
} catch (PlanException pe) {
@@ -142,18 +161,22 @@
String msg = "Problem with setting up local rearrange's
plans.";
throw new ExecException(msg, errCode, PigException.BUG, pe);
}
- LRs[i]= lr;
+ LRs[i] = lr;
ConstantExpression ce = new ConstantExpression(genKey(old));
- ce.setResultType((i==fragment)?DataType.TUPLE:DataType.BAG);
+ ce.setResultType((i == fragment) ? DataType.TUPLE : DataType.BAG);
constExps[i] = ce;
PhysicalPlan pp = new PhysicalPlan();
pp.add(ce);
fePlans.add(pp);
flatList.add(true);
}
- fe = new POForEach(genKey(old),-1,fePlans,flatList);
+ // The ForEach operator here is used for generating a Cross-Product
+ // It is given a set of constant expressions with
+ // Tuple,(Bag|Tuple),(...)
+ // It does a cross product on that and produces output.
+ fe = new POForEach(genKey(old), -1, fePlans, flatList);
}
-
+
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitFRJoin(this);
@@ -161,18 +184,17 @@
@Override
public String name() {
- return "FRJoin[" + DataType.findTypeName(resultType) + "]" +" - " +
mKey.toString();
+ return "FRJoin[" + DataType.findTypeName(resultType) + "]" + " - "
+ + mKey.toString();
}
@Override
public boolean supportsMultipleInputs() {
- // TODO Auto-generated method stub
return true;
}
@Override
public boolean supportsMultipleOutputs() {
- // TODO Auto-generated method stub
return false;
}
@@ -180,34 +202,36 @@
public Result getNext(Tuple t) throws ExecException {
Result res = null;
Result inp = null;
- if(!setUp){
+ if (!setUp) {
setUpHashMap();
setUp = true;
}
- if(processingPlan){
- //Return tuples from the for each operator
- //Assumes that it is configured appropriately with
- //the bags for the current key.
- while(true) {
+ if (processingPlan) {
+ // Return tuples from the for each operator
+ // Assumes that it is configured appropriately with
+ // the bags for the current key.
+ while (true) {
res = fe.getNext(dummyTuple);
-
- if(res.returnStatus==POStatus.STATUS_OK){
+
+ if (res.returnStatus == POStatus.STATUS_OK) {
return res;
}
- if(res.returnStatus==POStatus.STATUS_EOP){
- processingPlan = false;
+ if (res.returnStatus == POStatus.STATUS_EOP) {
+ // We have completed all cross-products now its time to
move
+ // to next tuple of left side
+ processingPlan = false;
break;
}
- if(res.returnStatus==POStatus.STATUS_ERR) {
+ if (res.returnStatus == POStatus.STATUS_ERR) {
return res;
}
- if(res.returnStatus==POStatus.STATUS_NULL) {
+ if (res.returnStatus == POStatus.STATUS_NULL) {
continue;
}
}
}
while (true) {
- //Process the current input
+ // Process the current input
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP
|| inp.returnStatus == POStatus.STATUS_ERR)
@@ -215,99 +239,128 @@
if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
-
- //Separate Key & Value using the fragment's LR operator
+
+ // Separate Key & Value using the fragment's LR operator
POLocalRearrange lr = LRs[fragment];
- lr.attachInput((Tuple)inp.result);
+ lr.attachInput((Tuple) inp.result);
Result lrOut = lr.getNext(dummyTuple);
- if(lrOut.returnStatus!=POStatus.STATUS_OK) {
- log.error("LocalRearrange isn't configured right or is not
working");
+ if (lrOut.returnStatus != POStatus.STATUS_OK) {
+ log
+ .error("LocalRearrange isn't configured right or is
not working");
return new Result();
}
Tuple lrOutTuple = (Tuple) lrOut.result;
Tuple key = TupleFactory.getInstance().newTuple(1);
- key.set(0,lrOutTuple.get(1));
+ key.set(0, lrOutTuple.get(1));
Tuple value = getValueTuple(lr, lrOutTuple);
-
- //Configure the for each operator with the relevant bags
- int i=-1;
+
+ // Configure the for each operator with the relevant bags
+ int i = -1;
boolean noMatch = false;
for (ConstantExpression ce : constExps) {
++i;
- if(i==fragment){
+ if (i == fragment) {
+ // We set the first CE as the tuple from fragmented Left
ce.setValue(value);
continue;
}
Map<Tuple, List<Tuple>> replicate = replicates[i];
- if(!replicate.containsKey(key)){
+ if (!replicate.containsKey(key)) {
+ if (isLeftOuterJoin) {
+ ce.setValue(nullBag);
+ }
noMatch = true;
break;
}
ce.setValue(mBagFactory.newDefaultBag(replicate.get(key)));
}
- if(noMatch)
+
+ // If this is not LeftOuter Join and there was no match we
+ // skip the processing of this left tuple and move ahead
+ if (!isLeftOuterJoin && noMatch)
continue;
fe.attachInput(dumTup);
processingPlan = true;
-
+
+ // We are all set, we call getNext (this function) which will call
+ // getNext on ForEach
+ // And that will return one tuple of Cross-Product between set
+ // constant Expressions
+ // All subsequent calls ( by parent ) to this function will return
+ // next tuple of crossproduct
Result gn = getNext(dummyTuple);
+
return gn;
}
}
/**
- * Builds the HashMaps by reading each replicated input from the DFS
- * using a Load operator
+ * Builds the HashMaps by reading each replicated input from the DFS using
a
+ * Load operator
+ *
* @throws ExecException
*/
private void setUpHashMap() throws ExecException {
- int i=-1;
+ int i = -1;
long time1 = System.currentTimeMillis();
for (FileSpec replFile : replFiles) {
++i;
- if(i==fragment){
+
+ if (i == fragment) {
replicates[i] = null;
continue;
}
- POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L),
replFile, false);
- PigContext pc = new
PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
+ POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L),
+ replFile, false);
+ PigContext pc = new PigContext(ExecType.MAPREDUCE,
+ ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
pc.connect();
ld.setPc(pc);
+ // We use LocalRearrange Operator to seperate Key and Values
+ // eg. ( a, b, c ) would generate a, ( a, b, c )
+ // And we use 'a' as the key to the HashMap
+ // The rest '( a, b, c )' is added to HashMap as value
+ // We could have manually done this, but LocalRearrange does the
+ // same thing, so utilizing its functionality
POLocalRearrange lr = LRs[i];
- lr.setInputs(Arrays.asList((PhysicalOperator)ld));
- Map<Tuple, List<Tuple>> replicate = new HashMap<Tuple,
List<Tuple>>(1000);
+ lr.setInputs(Arrays.asList((PhysicalOperator) ld));
+ Map<Tuple, List<Tuple>> replicate = new HashMap<Tuple,
List<Tuple>>(
+ 1000);
log.debug("Completed setup. Trying to build replication hash
table");
int cnt = 0;
- for(Result
res=lr.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(dummyTuple)){
+ for (Result res = lr.getNext(dummyTuple);res.returnStatus !=
POStatus.STATUS_EOP;res = lr.getNext(dummyTuple)) {
++cnt;
- if(reporter!=null) reporter.progress();
+ if (reporter != null)
+ reporter.progress();
Tuple tuple = (Tuple) res.result;
Tuple key = mTupleFactory.newTuple(1);
- key.set(0,tuple.get(1));
+ key.set(0, tuple.get(1));
Tuple value = getValueTuple(lr, tuple);
- if(!replicate.containsKey(key))
+ if (!replicate.containsKey(key))
replicate.put(key, new ArrayList<Tuple>());
replicate.get(key).add(value);
}
replicates[i] = replicate;
}
- long time2 = System.currentTimeMillis();
- log.debug("Hash Table built. Time taken: " + (time2-time1));
+ long time2 = System.currentTimeMillis();
+ log.debug("Hash Table built. Time taken: " + (time2 - time1));
}
-
- private void readObject(ObjectInputStream is) throws IOException,
ClassNotFoundException, ExecException{
+
+ private void readObject(ObjectInputStream is) throws IOException,
+ ClassNotFoundException, ExecException {
is.defaultReadObject();
mTupleFactory = TupleFactory.getInstance();
mBagFactory = BagFactory.getInstance();
-// setUpHashTable();
+ // setUpHashTable();
}
-
+
/*
* Extracts the value tuple from the LR operator's output tuple
*/
- private Tuple getValueTuple(POLocalRearrange lr, Tuple tuple) throws
ExecException {
+ private Tuple getValueTuple(POLocalRearrange lr, Tuple tuple)
+ throws ExecException {
Tuple val = (Tuple) tuple.get(2);
Tuple retTup = null;
boolean isProjectStar = lr.isProjectStar();
@@ -315,18 +368,18 @@
int keyLookupSize = keyLookup.size();
Object key = tuple.get(1);
boolean isKeyTuple = lr.isKeyTuple();
- Tuple keyAsTuple = isKeyTuple ? (Tuple)tuple.get(1) : null;
- if( keyLookupSize > 0) {
-
+ Tuple keyAsTuple = isKeyTuple ? (Tuple) tuple.get(1) : null;
+ if (keyLookupSize > 0) {
+
// we have some fields of the "value" in the
// "key".
retTup = mTupleFactory.newTuple();
int finalValueSize = keyLookupSize + val.size();
- int valIndex = 0; // an index for accessing elements from
- // the value (val) that we have currently
- for(int i = 0; i < finalValueSize; i++) {
+ int valIndex = 0; // an index for accessing elements from
+ // the value (val) that we have currently
+ for (int i = 0; i < finalValueSize; i++) {
Integer keyIndex = keyLookup.get(i);
- if(keyIndex == null) {
+ if (keyIndex == null) {
// the field for this index is not in the
// key - so just take it from the "value"
// we were handed
@@ -334,7 +387,7 @@
valIndex++;
} else {
// the field for this index is in the key
- if(isKeyTuple) {
+ if (isKeyTuple) {
// the key is a tuple, extract the
// field out of the tuple
retTup.append(keyAsTuple.get(keyIndex));
@@ -343,19 +396,19 @@
}
}
}
-
+
} else if (isProjectStar) {
-
+
// the whole "value" is present in the "key"
retTup = mTupleFactory.newTuple(keyAsTuple.getAll());
-
+
} else {
-
+
// there is no field of the "value" in the
// "key" - so just make a copy of what we got
// as the "value"
retTup = mTupleFactory.newTuple(val.getAll());
-
+
}
return retTup;
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
Tue Nov 3 23:27:41 2009
@@ -2008,15 +2008,15 @@
// For all types of join we create LOJoin and mark what type of join it
is.
(
[<USING> ("\"replicated\"" {
- if(isOuter) {
- throw new ParseException("Replicated join does not
support (left|right|full) outer joins");
- }
+ if(isFullOuter || isRightOuter) {
+ throw new ParseException("Replicated join does not
support (right|full) outer joins");
+ }
frj = parseJoin(gis, lp,
LOJoin.JOINTYPE.REPLICATED);
}
| "\"repl\"" {
- if(isOuter) {
- throw new ParseException("Replicated join does not
support (left|right|full) outer joins");
- }
+ if(isFullOuter || isRightOuter) {
+ throw new ParseException("Replicated join does not
support (right|full) outer joins");
+ }
frj=parseJoin(gis, lp,
LOJoin.JOINTYPE.REPLICATED);
}
|"\"skewed\"" {
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java Tue Nov 3
23:27:41 2009
@@ -22,8 +22,11 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
+import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
import junit.framework.Assert;
import junit.framework.TestCase;
@@ -56,6 +59,7 @@
public class TestFRJoin extends TestCase{
private static final String INPUT_FILE = "testFrJoinInput.txt";
+ private static final String INPUT_FILE2 = "testFrJoinInput2.txt";
private PigServer pigServer;
private MiniCluster cluster = MiniCluster.buildCluster();
private File tmpFile;
@@ -77,11 +81,21 @@
input[k++] = si + "\t" + j;
}
Util.createInputFile(cluster, INPUT_FILE, input);
+
+ String[] input2 = new String[2*(LOOP_SIZE/2)];
+ k = 0;
+ for(int i = 1; i <= LOOP_SIZE/2; i++) {
+ String si = i + "";
+ for(int j=1;j<=LOOP_SIZE/2;j++)
+ input2[k++] = si + "\t" + j;
+ }
+ Util.createInputFile(cluster, INPUT_FILE2, input2);
}
@After
public void tearDown() throws Exception {
Util.deleteFile(cluster, INPUT_FILE);
+ Util.deleteFile(cluster, INPUT_FILE2 );
}
public static class FRJoin extends EvalFunc<DataBag>{
@@ -408,8 +422,83 @@
Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
+
+ @Test
+ public void testFRJoinOut8() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as
(x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as
(x:int,y:int);");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj =
BagFactory.getInstance().newDefaultBag();
+ Map<String,Tuple> hashFRJoin = new HashMap<String,Tuple>();
+ Map<String,Tuple> hashJoin = new HashMap<String,Tuple>();
+ {
+ pigServer.registerQuery("C = join A by $0 left, B by $0 using
\"replicated\";");
+ pigServer.registerQuery("D = join A by $1 left, B by $1 using
\"replicated\";");
+ pigServer.registerQuery("E = union C,D;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ Tuple tuple = iter.next();
+ String Key = tuple.toDelimitedString(",");
+ hashFRJoin.put( Key, tuple);
+ dbfrj.add(tuple);
+
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by $0 left, B by $0;");
+ pigServer.registerQuery("D = join A by $1 left, B by $1;");
+ pigServer.registerQuery("E = union C,D;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+ while(iter.hasNext()) {
+ Tuple tuple = iter.next();
+ String Key = tuple.toDelimitedString(",");
+ hashJoin.put( Key, tuple);
+ dbshj.add(tuple);
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
@Test
+ public void testFRJoinOut9() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as
(x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as
(x:int,y:int);");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj =
BagFactory.getInstance().newDefaultBag();
+ Map<String,Tuple> hashFRJoin = new HashMap<String,Tuple>();
+ Map<String,Tuple> hashJoin = new HashMap<String,Tuple>();
+ {
+ pigServer.registerQuery("C = join A by $0 left, B by $0 using
\"repl\";");
+ pigServer.registerQuery("D = join A by $1 left, B by $1 using
\"repl\";");
+ pigServer.registerQuery("E = union C,D;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ Tuple tuple = iter.next();
+ String Key = tuple.toDelimitedString(",");
+ hashFRJoin.put( Key, tuple);
+ dbfrj.add(tuple);
+
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by $0 left, B by $0;");
+ pigServer.registerQuery("D = join A by $1 left, B by $1;");
+ pigServer.registerQuery("E = union C,D;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+ while(iter.hasNext()) {
+ Tuple tuple = iter.next();
+ String Key = tuple.toDelimitedString(",");
+ hashJoin.put( Key, tuple);
+ dbshj.add(tuple);
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+ @Test
public void testFRJoinSch1() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as
(x:int,y:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as
(x:int,y:int);");
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=832599&r1=832598&r2=832599&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java Tue Nov 3 23:27:41
2009
@@ -466,9 +466,20 @@
} catch(Exception e) {
errCaught = true;
- assertEquals(true, e.getMessage().contains("does not
support (left|right|full) outer joins"));
+ if( j == 0 || j == 1 ) {
+ // This after adding support of LeftOuter Join to
replicated Join
+ assertEquals(true, e.getMessage().contains("does not
support (right|full) outer joins"));
+ } else {
+ assertEquals(true, e.getMessage().contains("does not
support (left|right|full) outer joins"));
+ }
+ }
+ if( i == 0 && ( j == 0 || j== 1 ) ) {
+ // This after adding support of LeftOuter Join to
replicated Join
+ assertEquals(false, errCaught);
+ }
+ else {
+ assertEquals(true, errCaught);
}
- assertEquals(true, errCaught);
}
}