Author: yanz
Date: Fri Dec 11 18:27:04 2009
New Revision: 889750
URL: http://svn.apache.org/viewvc?rev=889750&view=rev
Log:
PIG-1145: Merge Join on Large Table throws an EOF exception (yanz)
Added:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin2.java
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt?rev=889750&r1=889749&r2=889750&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt Fri Dec 11
18:27:04 2009
@@ -38,6 +38,9 @@
OPTIMIZATIONS
BUG FIXES
+
+ PIG-1145: Merge Join on Large Table throws an EOF exception (yanz)
+
PIG-1074 Zebra store function should allow '::' in column names in output
schema (yanz via gates)
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=889750&r1=889749&r2=889750&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
Fri Dec 11 18:27:04 2009
@@ -1137,12 +1137,12 @@
@Override
public boolean seekTo(BytesWritable key) throws IOException {
- boolean first = false, cur;
+ boolean first = false, cur, firstset = false;
for (int nx = 0; nx < cgScanners.length; nx++) {
if (cgScanners[nx] == null)
continue;
cur = cgScanners[nx].seekTo(key);
- if (nx != 0) {
+ if (firstset) {
if (cur != first) {
throw new IOException(
"seekTo() failed: Column Groups are not evenly positioned.");
@@ -1150,6 +1150,7 @@
}
else {
first = cur;
+ firstset = true;
}
}
return first;
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=889750&r1=889749&r2=889750&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
Fri Dec 11 18:27:04 2009
@@ -1095,7 +1095,7 @@
int index =
cgindex.lowerBound(new ByteArray(key.get(), 0, key.getSize()),
comparator);
- if (index > endIndex) {
+ if (index >= endIndex) {
seekToEnd();
return false;
}
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=889750&r1=889749&r2=889750&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
Fri Dec 11 18:27:04 2009
@@ -373,6 +373,7 @@
{
throw new IOException("The table is not properly
sorted");
}
+ setSorted(conf);
} else {
List<LeafTableInfo> leaves = expr.getLeafTables(null);
for (Iterator<LeafTableInfo> it = leaves.iterator();
it.hasNext(); )
Added:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin2.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin2.java?rev=889750&view=auto
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin2.java
(added)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin2.java
Fri Dec 11 18:27:04 2009
@@ -0,0 +1,594 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMergeJoin2 {
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ private static Configuration conf;
+ private static FileSystem fs;
+ final static int numsBatch = 4;
+ final static int numsInserters = 1;
+ static Path pathWorking;
+ static Path pathTable1;
+ static Path pathTable2;
+ final static String STR_SCHEMA1 =
"a:int,b:float,c:long,d:double,e:string,f:bytes,r1:record(f1:string,
f2:string),m1:map(string)";
+ final static String STR_SCHEMA2 = "m1:map(string),r1:record(f1:string,
f2:string),f:bytes,e:string,d:double,c:long,b:float,a:int";
+
+ final static String STR_STORAGE1 = "[a, b, c]; [e, f]; [r1.f1]; [m1#{a}]";
+ final static String STR_STORAGE2 = "[a];[b]; [c]; [e]; [f]; [r1.f1];
[m1#{a}]";
+ static int t1 =0;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System
+ .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ fs = cluster.getFileSystem();
+
+ conf = new Configuration();
+ pathWorking = fs.getWorkingDirectory();
+ pathTable1 = new Path(pathWorking, "table1");
+ pathTable2 = new Path(pathWorking, "table2");
+ System.out.println("pathTable1 =" + pathTable1);
+ createFirstTable();
+ createSecondTable();
+ }
+ public static void createFirstTable() throws IOException, ParseException {
+ BasicTable.Writer writer = new BasicTable.Writer(pathTable1, STR_SCHEMA1,
+ STR_STORAGE1, conf);
+ Schema schema = writer.getSchema();
+ //System.out.println("typeName" +
schema.getColumn("a").type.pigDataType());
+ Tuple tuple = TypesUtils.createTuple(schema);
+ TableInserter[] inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+ Tuple tupRecord1;
+ try {
+ tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+ .getSchema());
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ Map<String, String> m1 = new HashMap<String, String>();
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tupRecord1);
+ TypesUtils.resetTuple(tuple);
+ m1.clear();
+ try {
+ // first row of the table , the biggest row
+ if (i == 0 && b == 0) {
+ tuple.set(0, 100);
+ tuple.set(1, 100.1f);
+ tuple.set(2, 100L);
+ tuple.set(3, 50e+2);
+ tuple.set(4, "something2");
+ tuple.set(5, new DataByteArray("something2"));
+ }
+ // the middle + 1 row of the table, the smallest row
+ else if (i == 0 && b == (numsBatch / 2)) {
+ tuple.set(0, -100);
+ tuple.set(1, -100.1f);
+ tuple.set(2, -100L);
+ tuple.set(3, -50e+2);
+ tuple.set(4, "something1");
+ tuple.set(5, new DataByteArray("something1"));
+ }
+ else {
+ Float f = 1.1f;
+ long l = 11;
+ double d = 1.1;
+ tuple.set(0, b);
+ tuple.set(1, f);
+ tuple.set(2, l);
+ tuple.set(3, d);
+ tuple.set(4, "something1");
+ tuple.set(5, new DataByteArray("something1"));
+ }
+ // insert record
+ tupRecord1.set(0, "" + b);
+ tupRecord1.set(1, "" + b);
+ tuple.set(6, tupRecord1);
+
+ // insert map
+ m1.put("a", "" + b);
+ m1.put("b", "" + b);
+ m1.put("c", "" + b);
+ tuple.set(7, m1);
+
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ inserters[i].insert(new BytesWritable(("key_" + b).getBytes()), tuple);
+
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ writer.close();
+
+
+ //check table is setup correctly
+ String projection = new String("a,b,c,d,e,f,r1,m1");
+
+ BasicTable.Reader reader = new BasicTable.Reader(pathTable1, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getValue(RowValue);
+ System.out.println("rowvalue size:"+RowValue.size());
+ System.out.println("read a : " + RowValue.get(0).toString());
+ System.out.println("read string: " + RowValue.get(1).toString());
+
+ scanner.advance();
+ scanner.getValue(RowValue);
+ System.out.println("read float in 2nd row: "+ RowValue.get(1).toString());
+ System.out.println("done insert table");
+
+ reader.close();
+
+ }
+ public static void createSecondTable() throws IOException, ParseException {
+ BasicTable.Writer writer = new BasicTable.Writer(pathTable2, STR_SCHEMA2,
+ STR_STORAGE2, conf);
+ Schema schema = writer.getSchema();
+ //System.out.println("typeName" +
schema.getColumn("a").type.pigDataType());
+ Tuple tuple = TypesUtils.createTuple(schema);
+
+ TableInserter[] inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+ Tuple tupRecord1;
+ try {
+ tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+ .getSchema());
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ Map<String, String> m1 = new HashMap<String, String>();
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tupRecord1);
+ TypesUtils.resetTuple(tuple);
+ m1.clear();
+
+ try {
+ // first row of the table , the biggest row
+ if (i == 0 && b == 0) {
+ tuple.set(7, 100);
+ tuple.set(6, 100.1f);
+ tuple.set(5, 100L);
+ tuple.set(4, 50e+2);
+ tuple.set(3, "something");
+ tuple.set(2, new DataByteArray("something"));
+
+ }
+ // the middle +1 row of the table, the smallest row
+ else if (i == 0 && b == (numsBatch / 2)) {
+ tuple.set(7, -100);
+ tuple.set(6, -100.1f);
+ tuple.set(5, -100L);
+ tuple.set(4, -50e+2);
+ tuple.set(3, "so");
+ tuple.set(2, new DataByteArray("so"));
+
+ }
+
+ else {
+ Float f = 2.1f;
+ long l = 12;
+ double d = 2.1;
+ tuple.set(7, b*2);
+ tuple.set(6, f);
+ tuple.set(5, l);
+ tuple.set(4, d);
+ tuple.set(3, "somee");
+ tuple.set(2, new DataByteArray("somee"));
+ }
+
+ // insert record
+ tupRecord1.set(0, "" + b);
+ tupRecord1.set(1, "" + b);
+ tuple.set(1, tupRecord1);
+
+ // insert map
+
+ m1.put("a", "" + b);
+ m1.put("b", "" + b);
+ m1.put("c", "" + b);
+ tuple.set(0, m1);
+
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+
+ inserters[i].insert(new BytesWritable(("key" + b).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ writer.close();
+
+
+
+
+ //check table is setup correctly
+ String projection = new String("a,b,c,d,e,f,r1,m1");
+
+ BasicTable.Reader reader = new BasicTable.Reader(pathTable2, conf);
+ reader.setProjection(projection);
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getValue(RowValue);
+ System.out.println("rowvalue size:"+RowValue.size());
+ System.out.println("read a : " + RowValue.get(7).toString());
+ System.out.println("read string: " + RowValue.get(6).toString());
+
+ scanner.advance();
+ scanner.getValue(RowValue);
+ System.out.println("read float in 2nd row: "+ RowValue.get(6).toString());
+ System.out.println("done insert table");
+
+
+ reader.close();
+
+ }
+
+ public static void sortTable(Path tablePath, String sortkey){
+
+ }
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+
+ public void verify(Iterator<Tuple> it3) throws ExecException {
+ int row = 0;
+ Tuple RowValue3 = null;
+ while (it3.hasNext()) {
+ RowValue3 = it3.next();
+ Assert.assertEquals(16, RowValue3.size());
+ row++;
+
+ if (row == 1) {
+ // smallest row, the middle row of original table
+ Assert.assertEquals(-100, RowValue3.get(0));// a
+ Assert.assertEquals(-100.1f, RowValue3.get(1)); // b
+ Assert.assertEquals(-100L, RowValue3.get(2)); // c
+ Assert.assertEquals(-5000.0, RowValue3.get(3)); // d
+ Assert.assertEquals("so", RowValue3.get(4)); // e
+ Assert.assertEquals("so", RowValue3.get(5).toString());// f
+ Assert.assertEquals("" + numsBatch / 2, ((Tuple) RowValue3.get(6))
+ .get(0));// r
+ Assert.assertEquals("" + numsBatch / 2, ((Tuple) RowValue3.get(6))
+ .get(1));// r
+ Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(7))
+ .get("a"));// m
+ Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(7))
+ .get("b"));// m
+ Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(7))
+ .get("c"));// m
+ Assert.assertEquals(-100, RowValue3.get(15)); // a
+ Assert.assertEquals(-100.1f, RowValue3.get(14)); // b
+ Assert.assertEquals(-100L, RowValue3.get(13)); // c
+ Assert.assertEquals(-5000.0, RowValue3.get(12)); // d
+ Assert.assertEquals("so", RowValue3.get(11)); // e
+ Assert.assertEquals("so", RowValue3.get(10).toString());// f
+ Assert.assertEquals("" + numsBatch / 2, ((Tuple) RowValue3.get(9))
+ .get(0));// r
+ Assert.assertEquals("" + numsBatch / 2, ((Tuple) RowValue3.get(9))
+ .get(1));// r
+ Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(8))
+ .get("a"));// m
+ Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(8))
+ .get("b"));// m
+ Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(8))
+ .get("c"));// m
+ }
+
+ // largest row, the first row of the original table
+ if (row == 2) {
+ /*
+ Assert.assertEquals(100, RowValue3.get(0));// a
+ Assert.assertEquals(100.1f, RowValue3.get(1)); // b
+ Assert.assertEquals(100L, RowValue3.get(2)); // c
+ Assert.assertEquals(5000.0, RowValue3.get(3)); // d
+ Assert.assertEquals("something", RowValue3.get(4)); // e
+ Assert.assertEquals("something", RowValue3.get(5).toString());// f
+ Assert.assertEquals("" + 0, ((Tuple) RowValue3.get(6))
+ .get(0));// r
+ Assert.assertEquals("" + 0, ((Tuple) RowValue3.get(6))
+ .get(1));// r
+ Assert.assertEquals("" + 0, ((Map) RowValue3.get(7))
+ .get("a"));// m
+ Assert.assertEquals("" + 0, ((Map) RowValue3.get(7))
+ .get("b"));// m
+ Assert.assertEquals("" + 0, ((Map) RowValue3.get(7))
+ .get("c"));// m
+ Assert.assertEquals(100, RowValue3.get(15)); // a
+ Assert.assertEquals(100.1f, RowValue3.get(14)); // b
+ Assert.assertEquals(100L, RowValue3.get(13)); // c
+ Assert.assertEquals(5000.0, RowValue3.get(12)); // d
+ Assert.assertEquals("something", RowValue3.get(11)); // e
+ Assert.assertEquals("something", RowValue3.get(10).toString());// f
+ Assert.assertEquals("" + 0, ((Tuple) RowValue3.get(9)).get(0));// r
+ Assert.assertEquals("" + 0, ((Tuple) RowValue3.get(9)).get(1));// r
+ Assert.assertEquals("" + 0, ((Map) RowValue3.get(8)).get("a"));// m
+ Assert.assertEquals("" + 0, ((Map) RowValue3.get(8)).get("b"));// m
+ Assert.assertEquals("" + 0, ((Map) RowValue3.get(8)).get("c"));// m
+ */
+ }
+ }
+ Assert.assertEquals(0, row);
+ }
+
+ public Iterator<Tuple> joinTable(String table1, String table2, String
sortkey1, String sortkey2) throws IOException {
+ String query1 = "records1 = LOAD '" + this.pathTable1.toString()
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ System.out.println("query1:" + query1);
+ pigServer.registerQuery(query1);
+
+ String query2 = "records2 = LOAD '" + this.pathTable2.toString()
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ System.out.println("query2:" + query2);
+ pigServer.registerQuery(query2);
+
+ /* Iterator<Tuple> it_before_order = pigServer.openIterator("records");
+ int row_before_order = 0;
+ Tuple RowValue_before_order = null;
+ while (it_before_order.hasNext()) {
+ RowValue_before_order = it_before_order.next();
+ row_before_order++;
+ System.out.println("row : " + row_before_order + " field f value: "
+ + RowValue_before_order.get(5));
+ }
+ System.out.println("total row for orig table before ordered:"
+ + row_before_order);*/
+ String orderby1 = "sort1 = ORDER records1 BY " + sortkey1 + " ;";
+ String orderby2 = "sort2 = ORDER records2 BY " + sortkey2 + " ;";
+ pigServer.registerQuery(orderby1);
+ pigServer.registerQuery(orderby2);
+
+ /*Iterator<Tuple> it_after_order = pigServer.openIterator("srecs");
+ int row_after_order = 0;
+ Tuple RowValue_after_order = null;
+ while (it_after_order.hasNext()) {
+ RowValue_after_order = it_after_order.next();
+ row_after_order++;
+ System.out.println("row : " + row_after_order + " field b value: "
+ + RowValue_after_order.get(1));
+ }
+ System.out.println("total row for orig table after ordered:"
+ + row_after_order);*/
+ // Path newPath = new Path(getCurrentMethodName());
+
+ /*
+ * Table1 creation
+ */
+ this.t1++;
+
+ String table1path = this.pathTable1.toString() + Integer.toString(this.t1);
+ pigServer.store("sort1", table1path, TableStorer.class.getCanonicalName()
+ + "('[a, b, c]; [d, e, f, r1, m1]')");
+
+ String query3 = "records1 = LOAD '"
+ + table1path
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f,
r1, m1', 'sorted');";
+
+ System.out.println("query3:" + query3);
+ pigServer.registerQuery(query3);
+
+ String foreach = "records11 = foreach records1 generate a as a, b as b, c
as c, d as d, e as e, f as f, r1 as r1, m1 as m1;";
+ pigServer.registerQuery(foreach);
+ /* Iterator<Tuple> it_ordered = pigServer.openIterator("records1");
+ int row_ordered = 0;
+ Tuple RowValue_ordered = null;
+ while (it_ordered.hasNext()) {
+ RowValue_ordered = it_ordered.next();
+ row_ordered++;
+ System.out.println("row : " + row_ordered + " field a value: "
+ + RowValue_ordered.get(0));
+ }
+ System.out.println("total row for table 1 after ordered:" + row_ordered);*/
+
+ /*
+ * Table2 creation
+ */
+ this.t1++;
+ String table2path = this.pathTable2.toString() + Integer.toString(this.t1);
+ pigServer.store("sort2", table2path, TableStorer.class.getCanonicalName()
+ + "('[a, b, c]; [d,e,f,r1,m1]')");
+
+ String query4 = "records2 = LOAD '" + table2path
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query4);
+
+
+ String filter = "records22 = FILTER records2 BY a == '1.9';";
+ pigServer.registerQuery(filter);
+ /*Iterator<Tuple> it_ordered2 = pigServer.openIterator("records2");
+ int row_ordered2 = 0;
+ Tuple RowValue_ordered2 = null;
+ while (it_ordered2.hasNext()) {
+ RowValue_ordered2 = it_ordered2.next();
+ row_ordered2++;
+ System.out.println("row for table 2 after ordereed: " + row_ordered2
+ + " field a value: " + RowValue_ordered2.get(0));
+ }
+
+ System.out.println("total row for table 2:" + row_ordered2);
+ */
+ String join = "joinRecords = JOIN records11 BY " + "(" + sortkey1 + ")"
+ + " , records2 BY " + "("+ sortkey2 + ")"+" USING \"merge\";";
+ //TODO: can not use records22
+ pigServer.registerQuery(join);
+
+ // check JOIN content
+ Iterator<Tuple> it3 = pigServer.openIterator("joinRecords");
+ return it3;
+ }
+
+ //@Test
+ public void test1() throws ExecException, IOException {
+ /*
+ * join key: single integer column
+ */
+ Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(),
this.pathTable2.toString(), "a","a" );
+ verify (it3);
+ }
+
+ //@Test
+ public void test2() throws ExecException, IOException {
+ /*
+ * join key: single float column
+ */
+ Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(),
this.pathTable2.toString(), "b","b" );
+ verify(it3);
+ }
+
+ @Test
+ public void test3() throws ExecException, IOException {
+ /*
+ * join key: single string column
+ */
+ Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(),
this.pathTable2.toString(), "e","e" );
+ verify(it3);
+ }
+
+ //@Test
+ public void test4() throws ExecException, IOException {
+ /*
+ * join key: single byte column
+ */
+ Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(),
this.pathTable2.toString(), "f","f" );
+ verify(it3);
+ }
+
+ //@Test
+ public void test5() throws ExecException, IOException {
+ /*
+ * join key: single double column
+ */
+ Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(),
this.pathTable2.toString(), "d","d" );
+ verify(it3);
+ }
+
+ //@Test
+ public void test6() throws ExecException, IOException {
+ /*
+ * join key: single long column
+ */
+ Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(),
this.pathTable2.toString(), "c","c" );
+ verify(it3);
+ }
+
+ //@Test
+ public void test7() throws ExecException, IOException {
+ /*
+ * 2 join keys: integer and float
+ */
+ System.out.println ("helloo");
+ Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(),
this.pathTable2.toString(), "a,b","a,b" );
+ verify(it3);
+ }
+
+ //@Test
+ public void test8() throws ExecException, IOException {
+ /*
+ * multiple join keys: integer, float, long, double, string, bytes
+ */
+
+ // Failing with bytes (known bug)
+ Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(),
this.pathTable2.toString(), "a,b,c,d,e,f","a,b,c,d,e,f" );
+ verify(it3);
+ }
+
+ //@Test(expected = IOException.class)
+ public void test9a() throws ExecException, IOException {
+ /*
+ * Negative test case, one join key is not primitive type which is a record
+ * 2 join keys: integer and record
+ */
+ Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(),
this.pathTable2.toString(), "a,r1,e","a,r1,e");
+ }
+
+ //@Test(expected = IOException.class)
+ public void test9b() throws ExecException, IOException {
+ /*
+ * Negative test case, one join key is not primitive type which is a record
+ * 2 join keys: integer and map
+ */
+ Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(),
this.pathTable2.toString(), "a,m1,e","a,m1,e");
+ }
+
+}