Author: yanz
Date: Fri Dec 11 18:27:04 2009
New Revision: 889750

URL: http://svn.apache.org/viewvc?rev=889750&view=rev
Log:
PIG-1145: Merge Join on Large Table throws an EOF exception (yanz)

Added:
    
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin2.java
Modified:
    hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
    
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt?rev=889750&r1=889749&r2=889750&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt Fri Dec 11 
18:27:04 2009
@@ -38,6 +38,9 @@
   OPTIMIZATIONS
 
   BUG FIXES
+
+    PIG-1145: Merge Join on Large Table throws an EOF exception (yanz)
+
     PIG-1074 Zebra store function should allow '::' in column names in output
            schema (yanz via gates)
 

Modified: 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=889750&r1=889749&r2=889750&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 Fri Dec 11 18:27:04 2009
@@ -1137,12 +1137,12 @@
 
       @Override
       public boolean seekTo(BytesWritable key) throws IOException {
-        boolean first = false, cur;
+        boolean first = false, cur, firstset = false;
         for (int nx = 0; nx < cgScanners.length; nx++) {
           if (cgScanners[nx] == null)
             continue;
           cur = cgScanners[nx].seekTo(key);
-          if (nx != 0) {
+          if (firstset) {
             if (cur != first) {
               throw new IOException(
                   "seekTo() failed: Column Groups are not evenly positioned.");
@@ -1150,6 +1150,7 @@
           }
           else {
             first = cur;
+            firstset = true;
           }
         }
         return first;

Modified: 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=889750&r1=889749&r2=889750&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
 Fri Dec 11 18:27:04 2009
@@ -1095,7 +1095,7 @@
         int index =
             cgindex.lowerBound(new ByteArray(key.get(), 0, key.getSize()),
                 comparator);
-        if (index > endIndex) {
+        if (index >= endIndex) {
           seekToEnd();
           return false;
         }

Modified: 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=889750&r1=889749&r2=889750&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
 Fri Dec 11 18:27:04 2009
@@ -373,6 +373,7 @@
                 {
                         throw new IOException("The table is not properly 
sorted");
                 }
+                setSorted(conf);
         } else {
                 List<LeafTableInfo> leaves = expr.getLeafTables(null);
                 for (Iterator<LeafTableInfo> it = leaves.iterator(); 
it.hasNext(); )

Added: 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin2.java?rev=889750&view=auto
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin2.java
 (added)
+++ 
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin2.java
 Fri Dec 11 18:27:04 2009
@@ -0,0 +1,594 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMergeJoin2 {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Configuration conf;
+  private static FileSystem fs;
+  final static int numsBatch = 4;
+  final static int numsInserters = 1;
+  static Path pathWorking;
+  static Path pathTable1;
+  static Path pathTable2;
+  final static String STR_SCHEMA1 = 
"a:int,b:float,c:long,d:double,e:string,f:bytes,r1:record(f1:string, 
f2:string),m1:map(string)";
+  final static String STR_SCHEMA2 = "m1:map(string),r1:record(f1:string, 
f2:string),f:bytes,e:string,d:double,c:long,b:float,a:int";
+
+  final static String STR_STORAGE1 = "[a, b, c]; [e, f]; [r1.f1]; [m1#{a}]";
+  final static String STR_STORAGE2 = "[a];[b]; [c]; [e]; [f]; [r1.f1]; 
[m1#{a}]";
+  static int t1 =0;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    fs = cluster.getFileSystem();
+
+    conf = new Configuration();
+    pathWorking = fs.getWorkingDirectory();
+    pathTable1 = new Path(pathWorking, "table1");
+    pathTable2 = new Path(pathWorking, "table2");
+    System.out.println("pathTable1 =" + pathTable1);
+    createFirstTable();
+    createSecondTable();
+  }
+  public static void createFirstTable() throws IOException, ParseException {
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable1, STR_SCHEMA1,
+        STR_STORAGE1, conf);
+    Schema schema = writer.getSchema();
+    //System.out.println("typeName" + 
schema.getColumn("a").type.pigDataType());
+    Tuple tuple = TypesUtils.createTuple(schema);
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+    Tuple tupRecord1;
+    try {
+      tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    Map<String, String> m1 = new HashMap<String, String>();
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tupRecord1);
+        TypesUtils.resetTuple(tuple);
+        m1.clear();
+        try {
+          // first row of the table , the biggest row
+          if (i == 0 && b == 0) {
+            tuple.set(0, 100);
+            tuple.set(1, 100.1f);
+            tuple.set(2, 100L);
+            tuple.set(3, 50e+2);
+            tuple.set(4, "something2");
+            tuple.set(5, new DataByteArray("something2"));
+          }
+          // the middle + 1 row of the table, the smallest row
+          else if (i == 0 && b == (numsBatch / 2)) {
+            tuple.set(0, -100);
+            tuple.set(1, -100.1f);
+            tuple.set(2, -100L);
+            tuple.set(3, -50e+2);
+            tuple.set(4, "something1");
+            tuple.set(5, new DataByteArray("something1"));
+          }
+          else {
+            Float f = 1.1f;
+            long l = 11;
+            double d = 1.1;
+            tuple.set(0, b);
+            tuple.set(1, f);
+            tuple.set(2, l);
+            tuple.set(3, d);
+            tuple.set(4, "something1");
+            tuple.set(5, new DataByteArray("something1"));
+          }
+          // insert record
+          tupRecord1.set(0, "" + b);
+          tupRecord1.set(1, "" + b);
+          tuple.set(6, tupRecord1);
+
+          // insert map
+          m1.put("a", "" + b);
+          m1.put("b", "" + b);
+          m1.put("c", "" + b);
+          tuple.set(7, m1);
+
+        } catch (ExecException e) {
+          e.printStackTrace();
+        }
+       inserters[i].insert(new BytesWritable(("key_" + b).getBytes()), tuple);
+
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+
+
+    //check table is setup correctly
+    String projection = new String("a,b,c,d,e,f,r1,m1");
+
+    BasicTable.Reader reader = new BasicTable.Reader(pathTable1, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getValue(RowValue);
+    System.out.println("rowvalue size:"+RowValue.size());
+    System.out.println("read a : " + RowValue.get(0).toString());
+    System.out.println("read string: " + RowValue.get(1).toString());
+
+    scanner.advance();
+    scanner.getValue(RowValue);
+    System.out.println("read float in 2nd row: "+ RowValue.get(1).toString());
+    System.out.println("done insert table");
+
+    reader.close();
+
+  }
+  public static void createSecondTable() throws IOException, ParseException {
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable2, STR_SCHEMA2,
+        STR_STORAGE2, conf);
+    Schema schema = writer.getSchema();
+    //System.out.println("typeName" + 
schema.getColumn("a").type.pigDataType());
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+    Tuple tupRecord1;
+    try {
+      tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    Map<String, String> m1 = new HashMap<String, String>();
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tupRecord1);
+        TypesUtils.resetTuple(tuple);
+        m1.clear();
+
+        try {
+           // first row of the table , the biggest row
+          if (i == 0 && b == 0) {
+            tuple.set(7, 100);
+            tuple.set(6, 100.1f);
+            tuple.set(5, 100L);
+            tuple.set(4, 50e+2);
+            tuple.set(3, "something");
+            tuple.set(2, new DataByteArray("something"));
+
+          }
+          // the middle +1 row of the table, the smallest row
+          else if (i == 0 && b == (numsBatch / 2)) {
+            tuple.set(7, -100);
+            tuple.set(6, -100.1f);
+            tuple.set(5, -100L);
+            tuple.set(4, -50e+2);
+            tuple.set(3, "so");
+            tuple.set(2, new DataByteArray("so"));
+
+          }
+
+          else {
+            Float f = 2.1f;
+            long l = 12;
+            double d = 2.1;
+            tuple.set(7, b*2);
+            tuple.set(6, f);
+            tuple.set(5, l);
+            tuple.set(4, d);
+            tuple.set(3, "somee");
+            tuple.set(2, new DataByteArray("somee"));
+          }
+
+          // insert record
+          tupRecord1.set(0, "" + b);
+          tupRecord1.set(1, "" + b);
+          tuple.set(1, tupRecord1);
+
+          // insert map
+
+          m1.put("a", "" + b);
+          m1.put("b", "" + b);
+          m1.put("c", "" + b);
+          tuple.set(0, m1);
+
+        } catch (ExecException e) {
+          e.printStackTrace();
+        }
+
+        inserters[i].insert(new BytesWritable(("key" + b).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+
+
+
+
+    //check table is setup correctly
+    String projection = new String("a,b,c,d,e,f,r1,m1");
+
+    BasicTable.Reader reader = new BasicTable.Reader(pathTable2, conf);
+    reader.setProjection(projection);
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getValue(RowValue);
+    System.out.println("rowvalue size:"+RowValue.size());
+    System.out.println("read a : " + RowValue.get(7).toString());
+    System.out.println("read string: " + RowValue.get(6).toString());
+
+    scanner.advance();
+    scanner.getValue(RowValue);
+    System.out.println("read float in 2nd row: "+ RowValue.get(6).toString());
+    System.out.println("done insert table");
+
+
+    reader.close();
+
+  }
+
+  public static void sortTable(Path tablePath, String sortkey){
+
+  }
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+
+  public void verify(Iterator<Tuple> it3) throws ExecException {
+    int row = 0;
+    Tuple RowValue3 = null;
+    while (it3.hasNext()) {
+      RowValue3 = it3.next();
+      Assert.assertEquals(16, RowValue3.size());
+      row++;
+
+      if (row == 1) {
+        // smallest row,   the middle row of original table
+        Assert.assertEquals(-100, RowValue3.get(0));// a
+        Assert.assertEquals(-100.1f, RowValue3.get(1)); // b
+        Assert.assertEquals(-100L, RowValue3.get(2)); // c
+        Assert.assertEquals(-5000.0, RowValue3.get(3)); // d
+        Assert.assertEquals("so", RowValue3.get(4)); // e
+        Assert.assertEquals("so", RowValue3.get(5).toString());// f
+        Assert.assertEquals("" + numsBatch / 2, ((Tuple) RowValue3.get(6))
+            .get(0));// r
+        Assert.assertEquals("" + numsBatch / 2, ((Tuple) RowValue3.get(6))
+            .get(1));// r
+        Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(7))
+            .get("a"));// m
+        Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(7))
+            .get("b"));// m
+        Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(7))
+            .get("c"));// m
+        Assert.assertEquals(-100, RowValue3.get(15)); // a
+        Assert.assertEquals(-100.1f, RowValue3.get(14)); // b
+        Assert.assertEquals(-100L, RowValue3.get(13)); // c
+        Assert.assertEquals(-5000.0, RowValue3.get(12)); // d
+        Assert.assertEquals("so", RowValue3.get(11)); // e
+        Assert.assertEquals("so", RowValue3.get(10).toString());// f
+        Assert.assertEquals("" + numsBatch / 2, ((Tuple) RowValue3.get(9))
+            .get(0));// r
+        Assert.assertEquals("" + numsBatch / 2, ((Tuple) RowValue3.get(9))
+            .get(1));// r
+        Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(8))
+            .get("a"));// m
+        Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(8))
+            .get("b"));// m
+        Assert.assertEquals("" + numsBatch / 2, ((Map) RowValue3.get(8))
+            .get("c"));// m
+      }
+
+      // largest row, the first row of the original table
+      if (row == 2) {
+        /*
+        Assert.assertEquals(100, RowValue3.get(0));// a
+        Assert.assertEquals(100.1f, RowValue3.get(1)); // b
+        Assert.assertEquals(100L, RowValue3.get(2)); // c
+        Assert.assertEquals(5000.0, RowValue3.get(3)); // d
+        Assert.assertEquals("something", RowValue3.get(4)); // e
+        Assert.assertEquals("something", RowValue3.get(5).toString());// f
+        Assert.assertEquals("" + 0, ((Tuple) RowValue3.get(6))
+            .get(0));// r
+        Assert.assertEquals("" + 0, ((Tuple) RowValue3.get(6))
+            .get(1));// r
+        Assert.assertEquals("" + 0, ((Map) RowValue3.get(7))
+            .get("a"));// m
+        Assert.assertEquals("" + 0, ((Map) RowValue3.get(7))
+            .get("b"));// m
+        Assert.assertEquals("" + 0, ((Map) RowValue3.get(7))
+            .get("c"));// m
+        Assert.assertEquals(100, RowValue3.get(15)); // a
+        Assert.assertEquals(100.1f, RowValue3.get(14)); // b
+        Assert.assertEquals(100L, RowValue3.get(13)); // c
+        Assert.assertEquals(5000.0, RowValue3.get(12)); // d
+        Assert.assertEquals("something", RowValue3.get(11)); // e
+        Assert.assertEquals("something", RowValue3.get(10).toString());// f
+        Assert.assertEquals("" + 0, ((Tuple) RowValue3.get(9)).get(0));// r
+        Assert.assertEquals("" + 0, ((Tuple) RowValue3.get(9)).get(1));// r
+        Assert.assertEquals("" + 0, ((Map) RowValue3.get(8)).get("a"));// m
+        Assert.assertEquals("" + 0, ((Map) RowValue3.get(8)).get("b"));// m
+        Assert.assertEquals("" + 0, ((Map) RowValue3.get(8)).get("c"));// m
+        */
+      }
+    }
+    Assert.assertEquals(0, row);
+  }
+
+  public Iterator<Tuple> joinTable(String table1, String table2, String 
sortkey1, String sortkey2) throws IOException {
+    String query1 = "records1 = LOAD '" + this.pathTable1.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    System.out.println("query1:" + query1);
+    pigServer.registerQuery(query1);
+
+    String query2 = "records2 = LOAD '" + this.pathTable2.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    System.out.println("query2:" + query2);
+    pigServer.registerQuery(query2);
+
+   /* Iterator<Tuple> it_before_order = pigServer.openIterator("records");
+    int row_before_order = 0;
+    Tuple RowValue_before_order = null;
+    while (it_before_order.hasNext()) {
+      RowValue_before_order = it_before_order.next();
+      row_before_order++;
+      System.out.println("row : " + row_before_order + " field f value: "
+          + RowValue_before_order.get(5));
+    }
+    System.out.println("total row for orig table before ordered:"
+        + row_before_order);*/
+    String orderby1 = "sort1 = ORDER records1 BY " + sortkey1 + " ;";
+    String orderby2 = "sort2 = ORDER records2 BY " + sortkey2 + " ;";
+    pigServer.registerQuery(orderby1);
+    pigServer.registerQuery(orderby2);
+
+    /*Iterator<Tuple> it_after_order = pigServer.openIterator("srecs");
+    int row_after_order = 0;
+    Tuple RowValue_after_order = null;
+    while (it_after_order.hasNext()) {
+      RowValue_after_order = it_after_order.next();
+      row_after_order++;
+      System.out.println("row : " + row_after_order + " field b value: "
+          + RowValue_after_order.get(1));
+    }
+    System.out.println("total row for orig table after ordered:"
+        + row_after_order);*/
+    // Path newPath = new Path(getCurrentMethodName());
+
+    /*
+     * Table1 creation
+     */
+    this.t1++;
+
+    String table1path = this.pathTable1.toString() + Integer.toString(this.t1);
+    pigServer.store("sort1", table1path, TableStorer.class.getCanonicalName()
+        + "('[a, b, c]; [d, e, f, r1, m1]')");
+
+    String query3 = "records1 = LOAD '"
+        + table1path
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, 
r1, m1', 'sorted');";
+
+    System.out.println("query3:" + query3);
+    pigServer.registerQuery(query3);
+
+    String foreach = "records11 = foreach records1 generate a as a, b as b, c 
as c, d as d, e as e, f as f, r1 as r1, m1 as m1;";
+    pigServer.registerQuery(foreach);
+   /* Iterator<Tuple> it_ordered = pigServer.openIterator("records1");
+    int row_ordered = 0;
+    Tuple RowValue_ordered = null;
+    while (it_ordered.hasNext()) {
+      RowValue_ordered = it_ordered.next();
+      row_ordered++;
+      System.out.println("row : " + row_ordered + " field a value: "
+          + RowValue_ordered.get(0));
+    }
+    System.out.println("total row for table 1 after ordered:" + row_ordered);*/
+
+    /*
+     * Table2 creation
+     */
+    this.t1++;
+    String table2path = this.pathTable2.toString() + Integer.toString(this.t1);
+    pigServer.store("sort2", table2path, TableStorer.class.getCanonicalName()
+        + "('[a, b, c]; [d,e,f,r1,m1]')");
+
+    String query4 = "records2 = LOAD '" + table2path
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query4);
+
+
+    String filter = "records22 = FILTER records2 BY a == '1.9';";
+    pigServer.registerQuery(filter);
+    /*Iterator<Tuple> it_ordered2 = pigServer.openIterator("records2");
+    int row_ordered2 = 0;
+    Tuple RowValue_ordered2 = null;
+    while (it_ordered2.hasNext()) {
+      RowValue_ordered2 = it_ordered2.next();
+      row_ordered2++;
+      System.out.println("row for table 2 after ordereed: " + row_ordered2
+          + " field a value: " + RowValue_ordered2.get(0));
+    }
+
+    System.out.println("total row for table 2:" + row_ordered2);
+    */
+    String join = "joinRecords = JOIN records11 BY " + "(" + sortkey1 + ")"
+        + " , records2 BY " + "("+ sortkey2 + ")"+" USING \"merge\";";
+   //TODO: can not use records22
+      pigServer.registerQuery(join);
+
+    // check JOIN content
+    Iterator<Tuple> it3 = pigServer.openIterator("joinRecords");
+    return it3;
+  }
+
+  //@Test
+  public void test1() throws ExecException, IOException {
+    /*
+     * join key: single integer column
+     */
+    Iterator<Tuple> it3 =  joinTable(this.pathTable1.toString(), 
this.pathTable2.toString(), "a","a" );
+    verify (it3);
+  }
+
+  //@Test
+  public void test2() throws ExecException, IOException {
+    /*
+     * join key: single float column
+     */
+    Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(), 
this.pathTable2.toString(), "b","b" );
+    verify(it3);
+  }
+
+  @Test
+  public void test3() throws ExecException, IOException {
+    /*
+     * join key: single string column
+     */
+    Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(), 
this.pathTable2.toString(), "e","e" );
+    verify(it3);
+  }
+
+  //@Test
+  public void test4() throws ExecException, IOException {
+    /*
+     * join key: single byte column
+     */
+    Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(), 
this.pathTable2.toString(), "f","f" );
+    verify(it3);
+  }
+
+  //@Test
+  public void test5() throws ExecException, IOException {
+    /*
+     * join key: single double column
+     */
+    Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(), 
this.pathTable2.toString(), "d","d" );
+    verify(it3);
+  }
+
+  //@Test
+  public void test6() throws ExecException, IOException {
+    /*
+     * join key: single long column
+     */
+    Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(), 
this.pathTable2.toString(), "c","c" );
+    verify(it3);
+  }
+
+  //@Test
+  public void test7() throws ExecException, IOException {
+    /*
+     *  2 join keys: integer and float
+     */
+    System.out.println ("helloo");
+    Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(), 
this.pathTable2.toString(), "a,b","a,b" );
+    verify(it3);
+  }
+
+  //@Test
+  public void test8() throws ExecException, IOException {
+    /*
+     * multiple join keys: integer, float, long, double, string, bytes
+     */
+
+       // Failing with bytes (known bug)
+    Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(), 
this.pathTable2.toString(), "a,b,c,d,e,f","a,b,c,d,e,f" );
+    verify(it3);
+  }
+
+  //@Test(expected = IOException.class)
+  public void test9a() throws ExecException, IOException {
+    /*
+     * Negative test case, one join key is not primitive type which is a record
+     * 2 join keys: integer and record
+     */
+    Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(), 
this.pathTable2.toString(), "a,r1,e","a,r1,e");
+  }
+
+  //@Test(expected = IOException.class)
+  public void test9b() throws ExecException, IOException {
+    /*
+     * Negative test case, one join key is not primitive type which is a record
+     * 2 join keys: integer and map
+     */
+    Iterator<Tuple> it3 = joinTable(this.pathTable1.toString(), 
this.pathTable2.toString(), "a,m1,e","a,m1,e");
+  }
+
+}


Reply via email to