Author: pradeepkth
Date: Tue Sep 8 17:40:12 2009
New Revision: 812595
URL: http://svn.apache.org/viewvc?rev=812595&view=rev
Log:
join ... outer, ... outer semantics are a no-ops, should produce corresponding
null values (pradeepkth)
Added:
hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/test/org/apache/pig/test/Util.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Sep 8 17:40:12 2009
@@ -28,6 +28,9 @@
IMPROVEMENTS
+PIG-578: join ... outer, ... outer semantics are a no-ops, should produce
+corresponding null values (pradeepkth)
+
PIG-936: making dump and PigDump independent from Tuple.toString (daijy)
PIG-890: Create a sampler interface and improve the skewed join sampler
(sriranjan via daijy)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
Tue Sep 8 17:40:12 2009
@@ -33,7 +33,11 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.backend.executionengine.ExecException;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -42,6 +46,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.IsEmpty;
import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
@@ -928,39 +933,145 @@
poPackage.setKeyType(type);
poPackage.setResultType(DataType.TUPLE);
poPackage.setNumInps(count);
-
- boolean inner[] = new boolean[count];
- for (int i=0;i<count;i++) {
- inner[i] = true;
- }
- poPackage.setInner(inner);
-
+
+ boolean[] innerFlags = loj.getInnerFlags();
+ poPackage.setInner(innerFlags);
+
List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
List<Boolean> flattenLst = new ArrayList<Boolean>();
- for(int i=1;i<=count;i++){
- PhysicalPlan fep1 = new PhysicalPlan();
- POProject feproj1 = new POProject(new OperatorKey(scope,
nodeGen.getNextNodeId(scope)), loj.getRequestedParallelism(), i);
- feproj1.setResultType(DataType.BAG);
- feproj1.setOverloaded(false);
- fep1.add(feproj1);
- fePlans.add(fep1);
- flattenLst.add(true);
- }
- POForEach fe = new POForEach(new OperatorKey(scope,
nodeGen.getNextNodeId(scope)), loj.getRequestedParallelism(), fePlans,
flattenLst );
- currentPlan.add(fe);
try{
+ for(int i=0;i< count;i++){
+ PhysicalPlan fep1 = new PhysicalPlan();
+ POProject feproj1 = new POProject(new OperatorKey(scope,
nodeGen.getNextNodeId(scope)),
+ loj.getRequestedParallelism(), i+1); //i+1 since
the first column is the "group" field
+ feproj1.setResultType(DataType.BAG);
+ feproj1.setOverloaded(false);
+ fep1.add(feproj1);
+ fePlans.add(fep1);
+ // the parser would have marked the side
+ // where we need to keep empty bags on
+ // non matched as outer (innerFlags[i] would be
+ // false)
+ if(!(innerFlags[i])) {
+ LogicalOperator joinInput = inputs.get(i);
+ // for outer join add a bincond
+ // which will project nulls when bag is
+ // empty
+ updateWithEmptyBagCheck(fep1, joinInput);
+ }
+ flattenLst.add(true);
+ }
+
+ POForEach fe = new POForEach(new OperatorKey(scope,
nodeGen.getNextNodeId(scope)),
+ loj.getRequestedParallelism(), fePlans, flattenLst );
+ currentPlan.add(fe);
currentPlan.connect(poPackage, fe);
+ LogToPhyMap.put(loj, fe);
}catch (PlanException e1) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical
plan" ;
throw new LogicalToPhysicalTranslatorException(msg,
errCode, PigException.BUG, e1);
}
- LogToPhyMap.put(loj, fe);
+
}
}
- private boolean validateMergeJoin(LOJoin loj) throws
LogicalToPhysicalTranslatorException{
+ /**
+ * updates plan with check for empty bag and if bag is empty to flatten
a bag
+ * with as many null's as dictated by the schema
+ * @param fePlan the plan to update
+ * @param joinInput the relation for which the corresponding bag is
being checked
+ * @throws PlanException
+ * @throws LogicalToPhysicalTranslatorException
+ */
+ public static void updateWithEmptyBagCheck(PhysicalPlan fePlan,
LogicalOperator joinInput) throws PlanException,
LogicalToPhysicalTranslatorException {
+ Schema inputSchema = null;
+ try {
+ inputSchema = joinInput.getSchema();
+
+
+ if(inputSchema == null) {
+ int errCode = 1105;
+ String msg = "Input (" + joinInput.getAlias() + ") " +
+ "on which outer join is desired should have a valid
schema";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode,
PigException.INPUT);
+ }
+ } catch (FrontendException e) {
+ int errCode = 2014;
+ String msg = "Error while determining the schema of input";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode,
PigException.BUG, e);
+ }
+
+ // we currently have POProject[bag] as the only operator in the plan
+ // If the bag is an empty bag, we should replace
+ // it with a bag with one tuple with null fields so that when we
flatten
+ // we do not drop records (flatten will drop records if the bag is left
+ // as an empty bag) and actually project nulls for the fields in
+ // the empty bag
+
+ // So we need to get to the following state:
+ // POProject[Bag]
+ // \
+ // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)
+ // \ | POProject[Bag]
+ // \ | /
+ // POBinCond
+
+ POProject relationProject = (POProject) fePlan.getRoots().get(0);
+ try {
+
+ // condition of the bincond
+ POProject relationProjectForIsEmpty = relationProject.clone();
+ fePlan.add(relationProjectForIsEmpty);
+ String scope = relationProject.getOperatorKey().scope;
+ FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName());
+ Object f = PigContext.instantiateFuncFromSpec(isEmptySpec);
+ POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope,
NodeIdGenerator.getGenerator().
+ getNextNodeId(scope)), -1, null, isEmptySpec,
(EvalFunc) f);
+ isEmpty.setResultType(DataType.BOOLEAN);
+ fePlan.add(isEmpty);
+ fePlan.connect(relationProjectForIsEmpty, isEmpty);
+
+ // lhs of bincond (const bag with null fields)
+ ConstantExpression ce = new ConstantExpression(new
OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ // the following should give a tuple with the
+ // required number of nulls
+ Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size());
+ for(int i = 0; i < inputSchema.size(); i++) {
+ t.set(i, null);
+ }
+ List<Tuple> bagContents = new ArrayList<Tuple>(1);
+ bagContents.add(t);
+ DataBag bg = new NonSpillableDataBag(bagContents);
+ ce.setValue(bg);
+ ce.setResultType(DataType.BAG);
+ //this operator doesn't have any predecessors
+ fePlan.add(ce);
+
+ //rhs of bincond is the original project
+ // let's set up the bincond now
+ POBinCond bincond = new POBinCond(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ bincond.setCond(isEmpty);
+ bincond.setLhs(ce);
+ bincond.setRhs(relationProject);
+ bincond.setResultType(DataType.BAG);
+ fePlan.add(bincond);
+
+ fePlan.connect(isEmpty, bincond);
+ fePlan.connect(ce, bincond);
+ fePlan.connect(relationProject, bincond);
+
+ } catch (Exception e) {
+ throw new PlanException("Error setting up outerjoin", e);
+ }
+
+
+ }
+
+ private boolean validateMergeJoin(LOJoin loj) throws
LogicalToPhysicalTranslatorException{
List<LogicalOperator> preds = loj.getInputs();
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
Tue Sep 8 17:40:12 2009
@@ -139,14 +139,12 @@
public void visit(LOJoin join) throws VisitorException {
String scope = join.getOperatorKey().scope;
List<LogicalOperator> inputs = join.getInputs();
+ boolean[] innerFlags = join.getInnerFlags();
// In local mode, LOJoin is achieved by POCogroup followed by a
POForEach with flatten
// Insert a POCogroup in the place of LOJoin
POCogroup poc = new POCogroup(new OperatorKey(scope,
nodeGen.getNextNodeId(scope)), join.getRequestedParallelism());
- boolean innerArray[] = new boolean[join.getInputs().size()];
- for (int i=0;i<join.getInputs().size();i++)
- innerArray[i] = true;
- poc.setInner(innerArray);
+ poc.setInner(innerFlags);
currentPlan.add(poc);
@@ -207,6 +205,7 @@
// Append POForEach after POCogroup
List<Boolean> flattened = new ArrayList<Boolean>();
List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+
for (int i=0;i<join.getInputs().size();i++)
{
PhysicalPlan ep = new PhysicalPlan();
@@ -217,6 +216,21 @@
prj.setStar(false);
ep.add(prj);
eps.add(ep);
+ // the parser would have marked the side
+ // where we need to keep empty bags on
+ // non matched as outer (innerFlags[i] would be
+ // false)
+ if(!(innerFlags[i])) {
+ LogicalOperator joinInput = inputs.get(i);
+ // for outer join add a bincond
+ // which will project nulls when bag is
+ // empty
+ try {
+ updateWithEmptyBagCheck(ep, joinInput);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
flattened.add(true);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java Tue Sep
8 17:40:12 2009
@@ -63,7 +63,7 @@
*/
private static Log log = LogFactory.getLog(LOJoin.class);
private MultiMap<LogicalOperator, LogicalPlan> mJoinPlans;
-
+ private boolean[] mInnerFlags;
private JOINTYPE mJoinType; // Retains the type of the join
/**
@@ -81,10 +81,20 @@
LogicalPlan plan,
OperatorKey k,
MultiMap<LogicalOperator, LogicalPlan> joinPlans,
- JOINTYPE jt) {
+ JOINTYPE jt,
+ boolean[] isInner) {
super(plan, k);
mJoinPlans = joinPlans;
mJoinType = jt;
+ mInnerFlags = getCopy(isInner);
+ }
+
+ private boolean[] getCopy(boolean[] flags) {
+ boolean[] retVal = new boolean[flags.length];
+ for (int i = 0; i < flags.length; i++) {
+ retVal[i] = flags[i];
+ }
+ return retVal;
}
public List<LogicalOperator> getInputs() {
@@ -544,4 +554,11 @@
// shall not get here
return null;
}
+
+ /**
+ * @return the mInnerFlags
+ */
+ public boolean[] getInnerFlags() {
+ return getCopy(mInnerFlags);
+ }
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
Tue Sep 8 17:40:12 2009
@@ -351,7 +351,7 @@
isInner[i] = gi.isInner;
}
- LogicalOperator loj = new LOJoin(lp, new OperatorKey(scope,
getNextId()), joinPlans, jt);
+ LogicalOperator loj = new LOJoin(lp, new OperatorKey(scope,
getNextId()), joinPlans, jt, isInner);
lp.add(loj);
log.debug("Added operator " + loj.getClass().getName() + "
object " + loj + " to the logical plan " + lp);
@@ -863,6 +863,9 @@
TOKEN : { <STDOUT: "stdout"> }
TOKEN : { <LIMIT: "limit"> }
TOKEN : { <SAMPLE: "sample"> }
+TOKEN : { <LEFT: "left"> }
+TOKEN : { <RIGHT: "right"> }
+TOKEN : { <FULL: "full"> }
TOKEN:
{
@@ -1637,6 +1640,53 @@
}
+CogroupInput JoinItem(LogicalPlan lp) :
+{
+ LogicalOperator cgOp;
+ boolean isInner = true;
+ ArrayList<LogicalPlan> listPlans = new ArrayList<LogicalPlan>();
+ LogicalPlan groupByPlan;
+ ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+ ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>();
+ log.trace("Entering JoinItem");
+ log.debug("LogicalPlan: " + lp);
+}
+{
+ (
+ cgOp = NestedExpr(lp)
+ (
+ ( <BY>
+ (
+ LOOKAHEAD ( "("
FlattenedGenerateItemList(cgOp.getSchema(), null, groupByPlan, cgOp) ")" )
+ ( "(" FlattenedGenerateItem(cgOp.getSchema(), null,
groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
+ {listPlans.add(groupByPlan);}
+ (
+ "," FlattenedGenerateItem(cgOp.getSchema(), null,
groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
+ {listPlans.add(groupByPlan);}
+ )*
+ ")"
+ )
+ | (
+ FlattenedGenerateItem(cgOp.getSchema(), null,
groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
+ {listPlans.add(groupByPlan);}
+ )
+ )
+ )
+ )
+ )
+ {
+ CogroupInput cogroupInput = new CogroupInput();
+
+ cogroupInput.plans = listPlans;
+ cogroupInput.op = cgOp;
+ cogroupInput.isInner = isInner;
+
+ log.trace("Exiting GroupItem");
+ return cogroupInput;
+ }
+}
+
+
CogroupInput GroupItem(LogicalPlan lp) :
{
ExpressionOperator es;
@@ -1870,15 +1920,86 @@
LogicalOperator frj = null;
LogicalOperator skj = null;
LogicalOperator smj = null;
+ boolean isLeftOuter = false;
+ boolean isRightOuter = false;
+ boolean isFullOuter = false;
+ boolean isOuter = false;
}
{
- (gi = GroupItem(lp) { gis.add(gi); }
- ("," gi = GroupItem(lp) { gis.add(gi); })+
+ (gi = JoinItem(lp) { gis.add(gi); }
+ [
+ (<LEFT> [<OUTER>] { isLeftOuter = true;})
+ |
+ (<RIGHT> [<OUTER>] {isRightOuter = true;})
+ |
+ (<FULL> [<OUTER>] {isFullOuter = true;})
+ ]
+ ("," gi = JoinItem(lp) { gis.add(gi); })+
+
+ {
+ // in the case of outer joins, only two
+ // inputs are allowed
+ isOuter = (isLeftOuter || isRightOuter || isFullOuter);
+ if(isOuter && gis.size() > 2) {
+ throw new ParseException("(left|right|full) outer joins are
only supported for two inputs");
+ }
+
+ // we have exactly two inputs
+
+ // the semantics of "outer"
+ // for join are different from cogroup
+ // cogroup a by $0 inner, b by $0 outer means keep
+ // all keys from a and for cases where there is no match
+ // from b have an empty bag for b. For keys in b which
+ // do not match in a, there will be no output records.
+ // Whereas with join,
+ // join a by $0 inner, b by $0 outer implies right outer
+ // join which has the exact opposite semantics - for
+ // all keys in b which do not have a match in b we need to
+ // output null for fields in a. For keys in a which do not
+ // match in b, no record should be output. Since we will be
+ // using the same underlying implementation for outer join
+ // as cogroup we should achieve join semantics by setting the
+ // isinner flag accordingly
+ if (isLeftOuter) {
+ gis.get(1).isInner = false;
+ } else if (isRightOuter) {
+ gis.get(0).isInner = false;
+ } else if (isFullOuter) {
+ gis.get(0).isInner = false;
+ gis.get(1).isInner = false;
+ }
+
+ }
// For all types of join we create LOJoin and mark what type of join it
is.
- ([<USING> ("\"replicated\"" { frj = parseJoin(gis, lp,
LOJoin.JOINTYPE.REPLICATED); }
- | "\"repl\"" { frj=parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED);}
- |"\"skewed\"" { skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); }
- |"\"merge\"" { smj = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE); })] ))
+ (
+ [<USING> ("\"replicated\"" {
+ if(isOuter) {
+ throw new ParseException("Replicated join does not
support (left|right|full) outer joins");
+ }
+ frj = parseJoin(gis, lp,
LOJoin.JOINTYPE.REPLICATED);
+ }
+ | "\"repl\"" {
+ if(isOuter) {
+ throw new ParseException("Replicated join does not
support (left|right|full) outer joins");
+ }
+ frj=parseJoin(gis, lp,
LOJoin.JOINTYPE.REPLICATED);
+ }
+ |"\"skewed\"" {
+ if(isOuter) {
+ throw new ParseException("Skewed join does not support
(left|right|full) outer joins");
+ }
+ skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED);
+ }
+ |"\"merge\"" {
+ if(isOuter) {
+ throw new ParseException("Merge join does not support
(left|right|full) outer joins");
+ }
+ smj = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE);
+ })
+ ]
+ )
+ )
{log.trace("Exiting JoinClause");
if (frj!=null) {
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=812595&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java Tue Sep 8 17:40:12
2009
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.test.utils.Identity;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * Test cases to test join statement
+ */
+public class TestJoin extends TestCase {
+
+ MiniCluster cluster;
+ private PigServer pigServer;
+
+ TupleFactory mTf = TupleFactory.getInstance();
+ BagFactory mBf = BagFactory.getInstance();
+
+ @Before
+ @Override
+ public void setUp() throws Exception{
+ FileLocalizer.setR(new Random());
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+
+ @Test
+ public void testDefaultJoin() throws IOException, ParseException {
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+ };
+
+ Util.createInputFile(cluster, "a.txt", input1);
+ Util.createInputFile(cluster, "b.txt", input2);
+ Tuple expectedResult =
(Tuple)Util.getPigConstant("('hello',1,'hello','world')");
+
+ // with schema
+ String script = "a = load 'a.txt' as (n:chararray, a:int); " +
+ "b = load 'b.txt' as (n:chararray, m:chararray); " +
+ "c = join a by $0, b by $0;";
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ assertEquals(true, it.hasNext());
+ assertEquals(expectedResult, it.next());
+ assertEquals(false, it.hasNext());
+
+ // without schema
+ script = "a = load 'a.txt'; " +
+ "b = load 'b.txt'; " +
+ "c = join a by $0, b by $0;";
+ Util.registerMultiLineQuery(pigServer, script);
+ it = pigServer.openIterator("c");
+ assertEquals(true, it.hasNext());
+ assertEquals(expectedResult.toString(), it.next().toString());
+ assertEquals(false, it.hasNext());
+ Util.deleteFile(cluster, "a.txt");
+ Util.deleteFile(cluster, "b.txt");
+ }
+
+ @Test
+ public void testLeftOuterJoin() throws IOException, ParseException {
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+
+ };
+
+ Util.createInputFile(cluster, "a.txt", input1);
+ Util.createInputFile(cluster, "b.txt", input2);
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('hello',1,'hello','world')",
+ "('bye',2,null,null)",
+ "(null,3,null,null)"
+ });
+
+ // with and without optional outer
+ for(int i = 0; i < 2; i++) {
+ //with schema
+ String script = "a = load 'a.txt' as (n:chararray, a:int); " +
+ "b = load 'b.txt' as (n:chararray, m:chararray); ";
+ if(i == 0) {
+ script += "c = join a by $0 left outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 left, b by $0;" ;
+ }
+ script += "d = order c by $1;";
+ // ensure we parse correctly
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan(script);
+
+ // run query and test results only once
+ if(i == 0) {
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ int counter= 0;
+ while(it.hasNext()) {
+ assertEquals(expectedResults.get(counter++), it.next());
+ }
+ assertEquals(expectedResults.size(), counter);
+
+ // without schema
+ script = "a = load 'a.txt'; " +
+ "b = load 'b.txt'; ";
+ if(i == 0) {
+ script += "c = join a by $0 left outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 left, b by $0;" ;
+ }
+ try {
+ Util.registerMultiLineQuery(pigServer, script);
+ } catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertEquals(1105, pe.getErrorCode());
+ }
+ }
+ }
+ Util.deleteFile(cluster, "a.txt");
+ Util.deleteFile(cluster, "b.txt");
+ }
+
+ @Test
+ public void testRightOuterJoin() throws IOException, ParseException {
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+
+ };
+
+ Util.createInputFile(cluster, "a.txt", input1);
+ Util.createInputFile(cluster, "b.txt", input2);
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(null,null,null,'evening')",
+ "(null,null,'good','morning')",
+ "('hello',1,'hello','world')"
+ });
+ // with and without optional outer
+ for(int i = 0; i < 2; i++) {
+ // with schema
+ String script = "a = load 'a.txt' as (n:chararray, a:int); " +
+ "b = load 'b.txt' as (n:chararray, m:chararray); ";
+ if(i == 0) {
+ script += "c = join a by $0 right outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 right, b by $0;" ;
+ }
+ script += "d = order c by $3;";
+ // ensure we parse correctly
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan(script);
+
+ // run query and test results only once
+ if(i == 0) {
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ int counter= 0;
+ while(it.hasNext()) {
+ assertEquals(expectedResults.get(counter++), it.next());
+ }
+ assertEquals(expectedResults.size(), counter);
+
+ // without schema
+ script = "a = load 'a.txt'; " +
+ "b = load 'b.txt'; " ;
+ if(i == 0) {
+ script += "c = join a by $0 right outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 right, b by $0;" ;
+ }
+ try {
+ Util.registerMultiLineQuery(pigServer, script);
+ } catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertEquals(1105, pe.getErrorCode());
+ }
+ }
+ }
+ Util.deleteFile(cluster, "a.txt");
+ Util.deleteFile(cluster, "b.txt");
+ }
+
+ @Test
+ public void testFullOuterJoin() throws IOException, ParseException {
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+
+ };
+
+ Util.createInputFile(cluster, "a.txt", input1);
+ Util.createInputFile(cluster, "b.txt", input2);
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(null,null,null,'evening')" ,
+ "(null,null,'good','morning')" ,
+ "('hello',1,'hello','world')" ,
+ "('bye',2,null,null)" ,
+ "(null,3,null,null)"
+ });
+ // with and without optional outer
+ for(int i = 0; i < 2; i++) {
+ // with schema
+ String script = "a = load 'a.txt' as (n:chararray, a:int); " +
+ "b = load 'b.txt' as (n:chararray, m:chararray); ";
+ if(i == 0) {
+ script += "c = join a by $0 full outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 full, b by $0;" ;
+ }
+ script += "d = order c by $1, $3;";
+ // ensure we parse correctly
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan(script);
+
+ // run query and test results only once
+ if(i == 0) {
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ int counter= 0;
+ while(it.hasNext()) {
+ assertEquals(expectedResults.get(counter++), it.next());
+ }
+ assertEquals(expectedResults.size(), counter);
+
+ // without schema
+ script = "a = load 'a.txt'; " +
+ "b = load 'b.txt'; " ;
+ if(i == 0) {
+ script += "c = join a by $0 full outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 full, b by $0;" ;
+ }
+ try {
+ Util.registerMultiLineQuery(pigServer, script);
+ } catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertEquals(1105, pe.getErrorCode());
+ }
+ }
+ }
+ Util.deleteFile(cluster, "a.txt");
+ Util.deleteFile(cluster, "b.txt");
+ }
+
+ @Test
+ public void testMultiOuterJoinFailure() {
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'a.txt' as (n:chararray, a:int); ");
+ lpt.buildPlan("b = load 'b.txt' as (n:chararray, m:chararray); ");
+ lpt.buildPlan("c = load 'c.txt' as (n:chararray, m:chararray); ");
+ String[] types = new String[] { "left", "right", "full" };
+ for (int i = 0; i < types.length; i++) {
+ boolean errCaught = false;
+ try {
+ lpt.buildPlanThrowExceptionOnError("d = join a by $0 " +
types[i] + " outer, b by $0, c by $0;") ;
+
+ } catch(Exception e) {
+ errCaught = true;
+ assertEquals("(left|right|full) outer joins are only supported
for two inputs", e.getMessage());
+ }
+ assertEquals(true, errCaught);
+
+ }
+
+ }
+
+ @Test
+ public void testNonRegularOuterJoinFailure() {
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'a.txt' as (n:chararray, a:int); ");
+ lpt.buildPlan("b = load 'b.txt' as (n:chararray, m:chararray); ");
+ String[] types = new String[] { "left", "right", "full" };
+ String[] joinTypes = new String[] { "replicated", "repl", "skewed",
"merge" };
+ for (int i = 0; i < types.length; i++) {
+ for(int j = 0; j < joinTypes.length; j++) {
+ boolean errCaught = false;
+ try {
+ lpt.buildPlanThrowExceptionOnError(
+ "d = join a by $0 " + types[i] + " outer, b by $0
using \"" + joinTypes[j] +"\" ;") ;
+
+ } catch(Exception e) {
+ errCaught = true;
+ assertEquals(true, e.getMessage().contains("does not
support (left|right|full) outer joins"));
+ }
+ assertEquals(true, errCaught);
+ }
+
+ }
+
+ }
+
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Tue Sep 8 17:40:12 2009
@@ -363,11 +363,19 @@
return schema;
}
- static Object getPigConstant(String pigConstantAsString) throws
ParseException {
+ public static Object getPigConstant(String pigConstantAsString) throws
ParseException {
ByteArrayInputStream stream = new
ByteArrayInputStream(pigConstantAsString.getBytes()) ;
QueryParser queryParser = new QueryParser(stream) ;
return queryParser.Datum();
}
+
+ public static List<Tuple> getTuplesFromConstantTupleStrings(String[]
tupleConstants) throws ParseException {
+ List<Tuple> result = new ArrayList<Tuple>(tupleConstants.length);
+ for(int i = 0; i < tupleConstants.length; i++) {
+ result.add((Tuple) getPigConstant(tupleConstants[i]));
+ }
+ return result;
+ }
public static File createFile(String[] data) throws Exception{
File f = File.createTempFile("tmp", "");
Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java?rev=812595&r1=812594&r2=812595&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java Tue
Sep 8 17:40:12 2009
@@ -23,6 +23,7 @@
import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
import org.apache.pig.impl.logicalLayer.optimizer.SchemaCalculator;
import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.PlanValidationException;
@@ -82,6 +83,10 @@
public LogicalPlan buildPlan(String query) {
return buildPlan(query, LogicalPlanBuilder.class.getClassLoader());
}
+
+ public LogicalPlan buildPlanThrowExceptionOnError(String query) throws
Exception {
+ return buildPlanThrowExceptionOnError(query,
LogicalPlanBuilder.class.getClassLoader());
+ }
/***
@@ -207,30 +212,8 @@
LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext);
try {
- LogicalPlan lp = builder.parse(SCOPE,
- query,
- aliases,
- logicalOpTable,
- aliasOp,
- fileNameMap);
-
- List<LogicalOperator> roots = lp.getRoots();
-
- if(roots.size() > 0) {
- if (logicalOpTable.get(roots.get(0)) instanceof
LogicalOperator){
- System.out.println(query);
- System.out.println(logicalOpTable.get(roots.get(0)));
- }
- if ((roots.get(0)).getAlias()!=null){
- aliases.put(roots.get(0), lp);
- }
- }
-
- assertTrue(lp != null);
-
- return lp ;
- }
- catch (IOException e) {
+ return parse(query, builder);
+ } catch (IOException e) {
fail("IOException: " + e.getMessage());
}
catch (Exception e) {
@@ -239,7 +222,47 @@
}
return null;
}
+
+ private LogicalPlan parse(String query, LogicalPlanBuilder builder) throws
IOException, ParseException {
+ LogicalPlan lp = builder.parse(SCOPE,
+ query,
+ aliases,
+ logicalOpTable,
+ aliasOp,
+ fileNameMap);
+
+ List<LogicalOperator> roots = lp.getRoots();
+
+ if(roots.size() > 0) {
+ if (logicalOpTable.get(roots.get(0)) instanceof LogicalOperator){
+ System.out.println(query);
+ System.out.println(logicalOpTable.get(roots.get(0)));
+ }
+ if ((roots.get(0)).getAlias()!=null){
+ aliases.put(roots.get(0), lp);
+ }
+ }
+
+ assertTrue(lp != null);
+
+ return lp ;
+
+ }
+ private LogicalPlan buildPlanThrowExceptionOnError (String query,
ClassLoader cldr) throws IOException, ParseException {
+
+ LogicalPlanBuilder.classloader =
LogicalPlanTester.class.getClassLoader() ;
+ PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new
Properties());
+ try {
+ pigContext.connect();
+ } catch (ExecException e1) {
+ fail(e1.getClass().getName() + ": " + e1.getMessage() + " -- " +
query);
+ }
+ LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext);
+
+ return parse(query, builder);
+ }
+
public void setPlan(LogicalPlan lp) throws VisitorException {
PlanSetter ps = new PlanSetter(lp);
ps.visit();