Author: pradeepkth
Date: Wed Sep 16 23:57:21 2009
New Revision: 816012
URL: http://svn.apache.org/viewvc?rev=816012&view=rev
Log:
PIG-963: Join in local mode matches null keys (pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=816012&r1=816011&r2=816012&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Sep 16 23:57:21 2009
@@ -40,6 +40,7 @@
BUG FIXES
+PIG-963: Join in local mode matches null keys (pradeepkth)
PIG-660: Integration with Hadoop 20 (sms via olgan)
PIG-962: Skewed join creates 3 map reduce jobs (sriranjan via olgan)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java?rev=816012&r1=816011&r2=816012&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
Wed Sep 16 23:57:21 2009
@@ -231,9 +231,16 @@
Object t1 = null;
Object t2 = null;
try {
- t1 = o1.get(1);
- t2 = o2.get(1);
-
+ // get the keys
+ t1 = o1.get(1);
+ t2 = o2.get(1);
+ if(t1 == t2 && t1 == null) {
+ // null keys from different inputs
+ // are not treated as equals
+ int firstInputIndex = (Byte)(o1.get(0));
+ int secondInputIndex = (Byte)(o2.get(0));
+ return firstInputIndex - secondInputIndex;
+ }
} catch (ExecException e) {
// TODO Auto-generated catch block
throw new RuntimeException("Error comparing tuples");
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=816012&r1=816011&r2=816012&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Wed Sep 16
23:57:21 2009
@@ -444,6 +444,13 @@
static Random r = new Random();
static ContainerDescriptor relativeRoot;
static boolean initialized = false;
+ /**
+ * @param initialized the initialized to set
+ */
+ public static void setInitialized(boolean initialized) {
+ FileLocalizer.initialized = initialized;
+ }
+
static private void init(final PigContext pigContext) throws
DataStorageException {
if (!initialized) {
initialized = true;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=816012&r1=816011&r2=816012&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java Wed Sep 16 23:57:21
2009
@@ -18,6 +18,7 @@
package org.apache.pig.test;
+import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -26,6 +27,7 @@
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -49,317 +51,365 @@
TupleFactory mTf = TupleFactory.getInstance();
BagFactory mBf = BagFactory.getInstance();
+ ExecType[] execTypes = new ExecType[] {ExecType.LOCAL, ExecType.MAPREDUCE};
@Before
@Override
public void setUp() throws Exception{
FileLocalizer.setR(new Random());
- cluster = MiniCluster.buildCluster();
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
}
+ private void setUp(ExecType execType) throws ExecException {
+ // cause a reinitialization of FileLocalizer's
+ // internal state
+ FileLocalizer.setInitialized(false);
+ if(execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
+ } else if(execType == ExecType.LOCAL) {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+ }
+
+ private String createInputFile(ExecType execType, String fileNameHint,
String[] data) throws IOException {
+ String fileName = "";
+ if(execType == ExecType.MAPREDUCE) {
+ Util.createInputFile(cluster, fileNameHint, data);
+ fileName = fileNameHint;
+ } else if (execType == ExecType.LOCAL) {
+ File f = Util.createInputFile("test", fileNameHint, data);
+ fileName = "file://" + f.getAbsolutePath();
+ }
+ return fileName;
+ }
+
+ private void deleteInputFile(ExecType execType, String fileName) throws
IOException {
+
+ if(execType == ExecType.MAPREDUCE) {
+ Util.deleteFile(cluster, fileName);
+ } else if(execType == ExecType.LOCAL){
+ fileName = fileName.replace("file://", "");
+ new File(fileName).delete();
+ }
+ }
+
@Test
public void testDefaultJoin() throws IOException, ParseException {
- String[] input1 = {
- "hello\t1",
- "bye\t2",
- "\t3"
- };
- String[] input2 = {
- "hello\tworld",
- "good\tmorning",
- "\tevening"
- };
-
- Util.createInputFile(cluster, "a.txt", input1);
- Util.createInputFile(cluster, "b.txt", input2);
- Tuple expectedResult =
(Tuple)Util.getPigConstant("('hello',1,'hello','world')");
-
- // with schema
- String script = "a = load 'a.txt' as (n:chararray, a:int); " +
- "b = load 'b.txt' as (n:chararray, m:chararray); " +
- "c = join a by $0, b by $0;";
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("c");
- assertEquals(true, it.hasNext());
- assertEquals(expectedResult, it.next());
- assertEquals(false, it.hasNext());
-
- // without schema
- script = "a = load 'a.txt'; " +
- "b = load 'b.txt'; " +
- "c = join a by $0, b by $0;";
- Util.registerMultiLineQuery(pigServer, script);
- it = pigServer.openIterator("c");
- assertEquals(true, it.hasNext());
- assertEquals(expectedResult.toString(), it.next().toString());
- assertEquals(false, it.hasNext());
- Util.deleteFile(cluster, "a.txt");
- Util.deleteFile(cluster, "b.txt");
+ for (ExecType execType : execTypes) {
+ setUp(execType);
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+ };
+
+ String firstInput = createInputFile(execType, "a.txt", input1);
+ String secondInput = createInputFile(execType, "b.txt", input2);
+ Tuple expectedResult =
(Tuple)Util.getPigConstant("('hello',1,'hello','world')");
+
+ // with schema
+ String script = "a = load '"+ firstInput +"' as (n:chararray,
a:int); " +
+ "b = load '"+ secondInput +"' as (n:chararray,
m:chararray); " +
+ "c = join a by $0, b by $0;";
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ assertEquals(true, it.hasNext());
+ assertEquals(expectedResult, it.next());
+ assertEquals(false, it.hasNext());
+
+ // without schema
+ script = "a = load '"+ firstInput + "'; " +
+ "b = load '" + secondInput + "'; " +
+ "c = join a by $0, b by $0;";
+ Util.registerMultiLineQuery(pigServer, script);
+ it = pigServer.openIterator("c");
+ assertEquals(true, it.hasNext());
+ assertEquals(expectedResult.toString(), it.next().toString());
+ assertEquals(false, it.hasNext());
+ deleteInputFile(execType, firstInput);
+ deleteInputFile(execType, secondInput);
+ }
}
@Test
public void testJoinSchema() throws Exception {
- String[] input1 = {
- "1\t2",
- "2\t3",
- "3\t4"
- };
- String[] input2 = {
- "1\thello",
- "4\tbye",
- };
-
- Util.createInputFile(cluster, "a.txt", input1);
- Util.createInputFile(cluster, "b.txt", input2);
- Tuple expectedResult =
(Tuple)Util.getPigConstant("(1,2,1,'hello',1,2,1,'hello')");
-
- // with schema
- String script = "a = load 'a.txt' as (i:int, j:int); " +
- "b = load 'b.txt' as (k:int, l:chararray); " +
- "c = join a by $0, b by $0;" +
- "d = foreach c generate i,j,k,l,a::i,a::j,b::k,b::l;";
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("d");
- assertEquals(true, it.hasNext());
- assertEquals(expectedResult, it.next());
- assertEquals(false, it.hasNext());
-
- // schema with duplicates
- script = "a = load 'a.txt' as (i:int, j:int); " +
- "b = load 'b.txt' as (i:int, l:chararray); " +
- "c = join a by $0, b by $0;" +
- "d = foreach c generate i,j,l,a::i,a::j,b::i,b::l;";
- boolean exceptionThrown = false;
- try{
+ for (ExecType execType : execTypes) {
+ setUp(execType);
+ String[] input1 = {
+ "1\t2",
+ "2\t3",
+ "3\t4"
+ };
+ String[] input2 = {
+ "1\thello",
+ "4\tbye",
+ };
+
+ String firstInput = createInputFile(execType, "a.txt", input1);
+ String secondInput = createInputFile(execType, "b.txt", input2);
+ Tuple expectedResult =
(Tuple)Util.getPigConstant("(1,2,1,'hello',1,2,1,'hello')");
+
+ // with schema
+ String script = "a = load '"+ firstInput +"' as (i:int, j:int); " +
+ "b = load '"+ secondInput +"' as (k:int, l:chararray); " +
+ "c = join a by $0, b by $0;" +
+ "d = foreach c generate i,j,k,l,a::i,a::j,b::k,b::l;";
Util.registerMultiLineQuery(pigServer, script);
- }catch (Exception e) {
- PigException pe = LogUtils.getPigException(e);
- assertEquals(1025, pe.getErrorCode());
- exceptionThrown = true;
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ assertEquals(true, it.hasNext());
+ assertEquals(expectedResult, it.next());
+ assertEquals(false, it.hasNext());
+
+ // schema with duplicates
+ script = "a = load '"+ firstInput +"' as (i:int, j:int); " +
+ "b = load '"+ secondInput +"' as (i:int, l:chararray); " +
+ "c = join a by $0, b by $0;" +
+ "d = foreach c generate i,j,l,a::i,a::j,b::i,b::l;";
+ boolean exceptionThrown = false;
+ try{
+ Util.registerMultiLineQuery(pigServer, script);
+ }catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertEquals(1025, pe.getErrorCode());
+ exceptionThrown = true;
+ }
+ assertEquals(true, exceptionThrown);
+
+ // schema with duplicates with resolution
+ script = "a = load '"+ firstInput +"' as (i:int, j:int); " +
+ "b = load '"+ secondInput +"' as (i:int, l:chararray); " +
+ "c = join a by $0, b by $0;" +
+ "d = foreach c generate a::i,j,b::i,l,a::i,a::j,b::i,b::l;";
+ Util.registerMultiLineQuery(pigServer, script);
+ it = pigServer.openIterator("d");
+ assertEquals(true, it.hasNext());
+ assertEquals(expectedResult, it.next());
+ assertEquals(false, it.hasNext());
+ deleteInputFile(execType, firstInput);
+ deleteInputFile(execType, secondInput);
}
- assertEquals(true, exceptionThrown);
-
- // schema with duplicates with resolution
- script = "a = load 'a.txt' as (i:int, j:int); " +
- "b = load 'b.txt' as (i:int, l:chararray); " +
- "c = join a by $0, b by $0;" +
- "d = foreach c generate a::i,j,b::i,l,a::i,a::j,b::i,b::l;";
- Util.registerMultiLineQuery(pigServer, script);
- it = pigServer.openIterator("d");
- assertEquals(true, it.hasNext());
- assertEquals(expectedResult, it.next());
- assertEquals(false, it.hasNext());
- Util.deleteFile(cluster, "a.txt");
- Util.deleteFile(cluster, "b.txt");
}
@Test
public void testLeftOuterJoin() throws IOException, ParseException {
- String[] input1 = {
- "hello\t1",
- "bye\t2",
- "\t3"
- };
- String[] input2 = {
- "hello\tworld",
- "good\tmorning",
- "\tevening"
-
- };
-
- Util.createInputFile(cluster, "a.txt", input1);
- Util.createInputFile(cluster, "b.txt", input2);
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "('hello',1,'hello','world')",
- "('bye',2,null,null)",
- "(null,3,null,null)"
- });
-
- // with and without optional outer
- for(int i = 0; i < 2; i++) {
- //with schema
- String script = "a = load 'a.txt' as (n:chararray, a:int); " +
- "b = load 'b.txt' as (n:chararray, m:chararray); ";
- if(i == 0) {
- script += "c = join a by $0 left outer, b by $0;" ;
- } else {
- script += "c = join a by $0 left, b by $0;" ;
- }
- script += "d = order c by $1;";
- // ensure we parse correctly
- LogicalPlanTester lpt = new LogicalPlanTester();
- lpt.buildPlan(script);
+ for (ExecType execType : execTypes) {
+ setUp(execType);
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+
+ };
- // run query and test results only once
- if(i == 0) {
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("d");
- int counter= 0;
- while(it.hasNext()) {
- assertEquals(expectedResults.get(counter++), it.next());
- }
- assertEquals(expectedResults.size(), counter);
-
- // without schema
- script = "a = load 'a.txt'; " +
- "b = load 'b.txt'; ";
+ String firstInput = createInputFile(execType, "a.txt", input1);
+ String secondInput = createInputFile(execType, "b.txt", input2);
+ List<Tuple> expectedResults =
Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('hello',1,'hello','world')",
+ "('bye',2,null,null)",
+ "(null,3,null,null)"
+ });
+
+ // with and without optional outer
+ for(int i = 0; i < 2; i++) {
+ //with schema
+ String script = "a = load '"+ firstInput +"' as (n:chararray,
a:int); " +
+ "b = load '"+ secondInput +"' as (n:chararray,
m:chararray); ";
if(i == 0) {
script += "c = join a by $0 left outer, b by $0;" ;
} else {
script += "c = join a by $0 left, b by $0;" ;
}
- try {
+ script += "d = order c by $1;";
+ // ensure we parse correctly
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan(script);
+
+ // run query and test results only once
+ if(i == 0) {
Util.registerMultiLineQuery(pigServer, script);
- } catch (Exception e) {
- PigException pe = LogUtils.getPigException(e);
- assertEquals(1105, pe.getErrorCode());
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ int counter= 0;
+ while(it.hasNext()) {
+ assertEquals(expectedResults.get(counter++),
it.next());
+ }
+ assertEquals(expectedResults.size(), counter);
+
+ // without schema
+ script = "a = load '"+ firstInput +"'; " +
+ "b = load '"+ secondInput +"'; ";
+ if(i == 0) {
+ script += "c = join a by $0 left outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 left, b by $0;" ;
+ }
+ try {
+ Util.registerMultiLineQuery(pigServer, script);
+ } catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertEquals(1105, pe.getErrorCode());
+ }
}
}
+ deleteInputFile(execType, firstInput);
+ deleteInputFile(execType, secondInput);
}
- Util.deleteFile(cluster, "a.txt");
- Util.deleteFile(cluster, "b.txt");
}
@Test
public void testRightOuterJoin() throws IOException, ParseException {
- String[] input1 = {
- "hello\t1",
- "bye\t2",
- "\t3"
- };
- String[] input2 = {
- "hello\tworld",
- "good\tmorning",
- "\tevening"
-
- };
-
- Util.createInputFile(cluster, "a.txt", input1);
- Util.createInputFile(cluster, "b.txt", input2);
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(null,null,null,'evening')",
- "(null,null,'good','morning')",
- "('hello',1,'hello','world')"
- });
- // with and without optional outer
- for(int i = 0; i < 2; i++) {
- // with schema
- String script = "a = load 'a.txt' as (n:chararray, a:int); " +
- "b = load 'b.txt' as (n:chararray, m:chararray); ";
- if(i == 0) {
- script += "c = join a by $0 right outer, b by $0;" ;
- } else {
- script += "c = join a by $0 right, b by $0;" ;
- }
- script += "d = order c by $3;";
- // ensure we parse correctly
- LogicalPlanTester lpt = new LogicalPlanTester();
- lpt.buildPlan(script);
+ for (ExecType execType : execTypes) {
+ setUp(execType);
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+
+ };
- // run query and test results only once
- if(i == 0) {
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("d");
- int counter= 0;
- while(it.hasNext()) {
- assertEquals(expectedResults.get(counter++), it.next());
- }
- assertEquals(expectedResults.size(), counter);
-
- // without schema
- script = "a = load 'a.txt'; " +
- "b = load 'b.txt'; " ;
+ String firstInput = createInputFile(execType, "a.txt", input1);
+ String secondInput = createInputFile(execType, "b.txt", input2);
+ List<Tuple> expectedResults =
Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(null,null,null,'evening')",
+ "(null,null,'good','morning')",
+ "('hello',1,'hello','world')"
+ });
+ // with and without optional outer
+ for(int i = 0; i < 2; i++) {
+ // with schema
+ String script = "a = load '"+ firstInput +"' as (n:chararray,
a:int); " +
+ "b = load '"+ secondInput +"' as (n:chararray,
m:chararray); ";
if(i == 0) {
script += "c = join a by $0 right outer, b by $0;" ;
} else {
script += "c = join a by $0 right, b by $0;" ;
}
- try {
+ script += "d = order c by $3;";
+ // ensure we parse correctly
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan(script);
+
+ // run query and test results only once
+ if(i == 0) {
Util.registerMultiLineQuery(pigServer, script);
- } catch (Exception e) {
- PigException pe = LogUtils.getPigException(e);
- assertEquals(1105, pe.getErrorCode());
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ int counter= 0;
+ while(it.hasNext()) {
+ assertEquals(expectedResults.get(counter++),
it.next());
+ }
+ assertEquals(expectedResults.size(), counter);
+
+ // without schema
+ script = "a = load '"+ firstInput +"'; " +
+ "b = load '"+ secondInput +"'; " ;
+ if(i == 0) {
+ script += "c = join a by $0 right outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 right, b by $0;" ;
+ }
+ try {
+ Util.registerMultiLineQuery(pigServer, script);
+ } catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertEquals(1105, pe.getErrorCode());
+ }
}
}
+ deleteInputFile(execType, firstInput);
+ deleteInputFile(execType, secondInput);
}
- Util.deleteFile(cluster, "a.txt");
- Util.deleteFile(cluster, "b.txt");
}
@Test
public void testFullOuterJoin() throws IOException, ParseException {
- String[] input1 = {
- "hello\t1",
- "bye\t2",
- "\t3"
- };
- String[] input2 = {
- "hello\tworld",
- "good\tmorning",
- "\tevening"
-
- };
-
- Util.createInputFile(cluster, "a.txt", input1);
- Util.createInputFile(cluster, "b.txt", input2);
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(null,null,null,'evening')" ,
- "(null,null,'good','morning')" ,
- "('hello',1,'hello','world')" ,
- "('bye',2,null,null)" ,
- "(null,3,null,null)"
- });
- // with and without optional outer
- for(int i = 0; i < 2; i++) {
- // with schema
- String script = "a = load 'a.txt' as (n:chararray, a:int); " +
- "b = load 'b.txt' as (n:chararray, m:chararray); ";
- if(i == 0) {
- script += "c = join a by $0 full outer, b by $0;" ;
- } else {
- script += "c = join a by $0 full, b by $0;" ;
- }
- script += "d = order c by $1, $3;";
- // ensure we parse correctly
- LogicalPlanTester lpt = new LogicalPlanTester();
- lpt.buildPlan(script);
+ for (ExecType execType : execTypes) {
+ setUp(execType);
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+
+ };
- // run query and test results only once
- if(i == 0) {
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("d");
- int counter= 0;
- while(it.hasNext()) {
- assertEquals(expectedResults.get(counter++), it.next());
- }
- assertEquals(expectedResults.size(), counter);
-
- // without schema
- script = "a = load 'a.txt'; " +
- "b = load 'b.txt'; " ;
+ String firstInput = createInputFile(execType, "a.txt", input1);
+ String secondInput = createInputFile(execType, "b.txt", input2);
+ List<Tuple> expectedResults =
Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(null,null,null,'evening')" ,
+ "(null,null,'good','morning')" ,
+ "('hello',1,'hello','world')" ,
+ "('bye',2,null,null)" ,
+ "(null,3,null,null)"
+ });
+ // with and without optional outer
+ for(int i = 0; i < 2; i++) {
+ // with schema
+ String script = "a = load '"+ firstInput +"' as (n:chararray,
a:int); " +
+ "b = load '"+ secondInput +"' as (n:chararray,
m:chararray); ";
if(i == 0) {
script += "c = join a by $0 full outer, b by $0;" ;
} else {
script += "c = join a by $0 full, b by $0;" ;
}
- try {
+ script += "d = order c by $1, $3;";
+ // ensure we parse correctly
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan(script);
+
+ // run query and test results only once
+ if(i == 0) {
Util.registerMultiLineQuery(pigServer, script);
- } catch (Exception e) {
- PigException pe = LogUtils.getPigException(e);
- assertEquals(1105, pe.getErrorCode());
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ int counter= 0;
+ while(it.hasNext()) {
+ assertEquals(expectedResults.get(counter++),
it.next());
+ }
+ assertEquals(expectedResults.size(), counter);
+
+ // without schema
+ script = "a = load '"+ firstInput +"'; " +
+ "b = load '"+ secondInput +"'; " ;
+ if(i == 0) {
+ script += "c = join a by $0 full outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 full, b by $0;" ;
+ }
+ try {
+ Util.registerMultiLineQuery(pigServer, script);
+ } catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertEquals(1105, pe.getErrorCode());
+ }
}
}
+ deleteInputFile(execType, firstInput);
+ deleteInputFile(execType, secondInput);
}
- Util.deleteFile(cluster, "a.txt");
- Util.deleteFile(cluster, "b.txt");
}
@Test