Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveVariableTable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveVariableTable.java?rev=834285&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveVariableTable.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveVariableTable.java
 Tue Nov 10 00:25:21 2009
@@ -0,0 +1,828 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestOrderPreserveVariableTable {
+       
+       final static String TABLE1_SCHEMA = 
"a:int,b:float,c:long,d:double,e:string,f:bytes,m1:map(string)";
+       final static String TABLE1_STORAGE = "[a, b, c]; [d, e, f]; [m1#{a}]";
+       
+       final static String TABLE2_SCHEMA = 
"a:int,b:float,c:long,d:double,e:string,f:bytes,m1:map(string)";
+       final static String TABLE2_STORAGE = "[a, b, c]; [d, e, f]; [m1#{a}]";
+       
+       final static String TABLE3_SCHEMA = 
"b:float,d:double,c:long,e:string,f:bytes,int1:int";
+       final static String TABLE3_STORAGE = "[b,d]; [c]; [e]; [f,int1]";
+       
+       final static String TABLE4_SCHEMA = "e:string,str3:string,str4:string";
+       final static String TABLE4_STORAGE = "[e,str3,str4]";
+       
+       static int fileId = 0;
+       static int sortId = 0;
+       
+       protected static ExecType execType = ExecType.MAPREDUCE;
+       private static MiniCluster cluster;
+       protected static PigServer pigServer;
+       protected static ExecJob pigJob;
+       
+       private static Path pathTable1;
+       private static Path pathTable2;
+       private static Path pathTable3;
+       private static Path pathTable4;
+       private static HashMap<String, String> tableStorage;
+       
+       private static Configuration conf;
+       
+       private static Object[][] table1;
+       private static Object[][] table2;
+       private static Object[][] table3;
+       private static Object[][] table4;
+       
+       private static Map<String, String> m1;
+       private static Map<String, String> m2;
+       
+       @BeforeClass
+       public static void setUp() throws Exception {
+               if (System.getProperty("hadoop.log.dir") == null) {
+                       String base = new File(".").getPath(); // 
getAbsolutePath();
+                       System.setProperty("hadoop.log.dir", new 
Path(base).toString() + "./logs");
+               }
+
+               if (execType == ExecType.MAPREDUCE) {
+                       cluster = MiniCluster.buildCluster();
+                       pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+               } else {
+                       pigServer = new PigServer(ExecType.LOCAL);
+               }
+               
+               conf = new Configuration();
+               FileSystem fs = cluster.getFileSystem();
+               Path pathWorking = fs.getWorkingDirectory();
+               
+               pathTable1 = new Path(pathWorking, "table1");
+               pathTable2 = new Path(pathWorking, "table2");
+               pathTable3 = new Path(pathWorking, "table3");
+               pathTable4 = new Path(pathWorking, "table4");
+               
+               // Create table1 data
+               m1 = new HashMap<String, String>();
+               m1.put("a","m1-a");
+               m1.put("b","m1-b");
+               
+               table1 = new Object[][]{
+                       {5,             -3.25f, 1001L,  51e+2,  "Zebra",        
new DataByteArray("Zebra"),             m1},
+                       {-1,    3.25f,  1000L,  50e+2,  "zebra",        new 
DataByteArray("zebra"),             m1},
+                       {1001,  100.0f, 1003L,  50e+2,  "Apple",        new 
DataByteArray("Apple"),             m1},
+                       {1001,  101.0f, 1001L,  50e+2,  "apple",        new 
DataByteArray("apple"),             m1},
+                       {1001,  50.0f,  1000L,  50e+2,  "Pig",          new 
DataByteArray("Pig"),               m1},
+                       {1001,  52.0f,  1001L,  50e+2,  "pig",          new 
DataByteArray("pig"),               m1},
+                       {1002,  28.0f,  1000L,  50e+2,  "Hadoop",       new 
DataByteArray("Hadoop"),    m1},
+                       {1000,  0.0f,   1002L,  52e+2,  "hadoop",       new 
DataByteArray("hadoop"),    m1} };
+               
+               // Create table1
+               createTable(pathTable1, TABLE1_SCHEMA, TABLE1_STORAGE, table1);
+               
+               // Create table2 data
+               m2 = new HashMap<String, String>();
+               m2.put("a","m2-a");
+               m2.put("b","m2-b");
+               
+               table2 = new Object[][] {
+                       {15,    56.0f,  1004L,  50e+2,  "green",        new 
DataByteArray("green"),             m2},
+                       {-1,    -99.0f, 1002L,  51e+2,  "orange",       new 
DataByteArray("orange"),    m2},
+                       {1001,  100.0f, 1003L,  55e+2,  "white",        new 
DataByteArray("white"),             m2},
+                       {1001,  102.0f, 1001L,  52e+2,  "purple",       new 
DataByteArray("purple"),    m2},
+                       {1001,  50.0f,  1008L,  52e+2,  "gray",         new 
DataByteArray("gray"),              m2},
+                       {1001,  53.0f,  1001L,  52e+2,  "brown",        new 
DataByteArray("brown"),             m2},
+                       {2000,  33.0f,  1006L,  52e+2,  "beige",        new 
DataByteArray("beige"),             m2} };
+               
+               // Create table2
+               createTable(pathTable2, TABLE2_SCHEMA, TABLE2_STORAGE, table2);
+               
+               // Create table3 data
+               table3 = new Object[][] {
+                       {3.25f,         51e+2,  1001L,  "string1",      new 
DataByteArray("green"),     100}, };
+               
+               // Create table3
+               createTable(pathTable3, TABLE3_SCHEMA, TABLE3_STORAGE, table3);
+               
+               // Create table4 data
+               table4 = new Object[][] {
+                       {"a 8", "Cupertino",    "California"},
+                       {"a 7", "San Jose",             "California"},
+                       {"a 6", "Santa Cruz",   "California"},
+                       {"a 5", "Las Vegas",    "Nevada"},
+                       {"a 4", "New York",             "New York"},
+                       {"a 3", "Phoenix",              "Arizona"},
+                       {"a 2", "Dallas",               "Texas"},
+                       {"a 1", "Reno",                 "Nevada"} };
+               
+               // Create table4
+               createTable(pathTable4, TABLE4_SCHEMA, TABLE4_STORAGE, table4);
+               
+               // Load tables
+               String query1 = "table1 = LOAD '" + pathTable1.toString() + "' 
USING org.apache.hadoop.zebra.pig.TableLoader();";
+               pigServer.registerQuery(query1);
+               String query2 = "table2 = LOAD '" + pathTable2.toString() + "' 
USING org.apache.hadoop.zebra.pig.TableLoader();";
+               pigServer.registerQuery(query2);
+               String query3 = "table3 = LOAD '" + pathTable3.toString() + "' 
USING org.apache.hadoop.zebra.pig.TableLoader();";
+               pigServer.registerQuery(query3);
+               String query4 = "table4 = LOAD '" + pathTable4.toString() + "' 
USING org.apache.hadoop.zebra.pig.TableLoader();";
+               pigServer.registerQuery(query4);
+               
+               // Map for table storage
+               tableStorage = new HashMap<String, String>();
+               tableStorage.put("table1", TABLE1_STORAGE);
+               tableStorage.put("table2", TABLE2_STORAGE);
+               tableStorage.put("table3", TABLE3_STORAGE);
+               tableStorage.put("table4", TABLE4_STORAGE);
+       }
+       
+       private static void createTable(Path path, String schemaString, String 
storageString, Object[][] tableData)
+                       throws IOException {
+               //
+               // Create table from tableData array
+               //
+               BasicTable.Writer writer = new BasicTable.Writer(path, 
schemaString, storageString, conf);
+               
+               Schema schema = writer.getSchema();
+               Tuple tuple = TypesUtils.createTuple(schema);
+               TableInserter inserter = writer.getInserter("ins", false);
+               
+               for (int i = 0; i < tableData.length; ++i) {
+                       TypesUtils.resetTuple(tuple);
+                       for (int k = 0; k < tableData[i].length; ++k) {
+                               tuple.set(k, tableData[i][k]);
+                               System.out.println("DEBUG: setting tuple k=" + 
k + "value= " + tableData[i][k]);
+                       }
+                       inserter.insert(new BytesWritable(("key" + 
i).getBytes()), tuple);
+               }
+               inserter.close();
+               writer.close();
+       }
+       
+       @AfterClass
+       public static void tearDown() throws Exception {
+               pigServer.shutdown();
+       }
+       
+       @Test
+       public void test_union_empty_left_table() throws ExecException, 
IOException {
+               //
+               // Test sorted union with two tables and left table is empty
+               //
+               
+               // Sort tables
+               String orderby1 = "sort1 = ORDER table1 BY " + "a,b" + " ;";
+               pigServer.registerQuery(orderby1);
+               
+               String orderby2 = "sort2 = ORDER table2 BY " + "a,b" + " ;";
+               pigServer.registerQuery(orderby2);
+               
+               // Filter to remove all rows in table
+               String filter = "filtersort1 = FILTER sort1 BY a == -99 ;";
+               
+               pigServer.registerQuery(filter);
+               
+               Path newPath = new Path(getCurrentMethodName());
+               
+               // Store sorted tables
+               String pathSort1 = newPath.toString() + "1";
+               pigJob = pigServer.store("filtersort1", pathSort1, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE1_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String pathSort2 = newPath.toString() + "2";
+               pigJob = pigServer.store("sort2", pathSort2, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE2_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String queryLoad = "records1 = LOAD '"
+               + pathSort1 + ","
+               + pathSort2
+               +       "' USING 
org.apache.hadoop.zebra.pig.TableLoader('a,source_table,b,c,d,e,f,m1', 
'sorted');";
+               
+               System.out.println("queryLoad: " + queryLoad);
+               
+               pigServer.registerQuery(queryLoad);
+               
+               // Verify union table
+               ArrayList<ArrayList<Object>> resultTable = new 
ArrayList<ArrayList<Object>>();
+               
+               addResultRow(resultTable, -1,   1,      -99.0f, 1002L,  51e+2,  
"orange",       new DataByteArray("orange"),m2);
+               addResultRow(resultTable, 15,   1,      56.0f,  1004L,  50e+2,  
"green",        new DataByteArray("green"),     m2);
+               addResultRow(resultTable, 1001, 1,      100.0f, 1003L,  55e+2,  
"white",        new DataByteArray("white"),     m2);
+               addResultRow(resultTable, 1001, 1,      102.0f, 1001L,  52e+2,  
"purple",       new DataByteArray("purple"),m2);
+               
+               addResultRow(resultTable, 1001, 1,      50.0f,  1008L,  52e+2,  
"gray",         new DataByteArray("gray"),      m2);
+               addResultRow(resultTable, 1001, 1,      53.0f,  1001L,  52e+2,  
"brown",        new DataByteArray("brown"),     m2);
+               addResultRow(resultTable, 2000, 1,      33.0f,  1006L,  52e+2,  
"beige",        new DataByteArray("beige"),     m2);
+               
+               // Verify union table
+               Iterator<Tuple> it = pigServer.openIterator("records1");
+               int numbRows = verifyTable(resultTable, 0, it);
+               
+               Assert.assertEquals(numbRows, table2.length);
+       }
+       
+       @Test
+       public void test_union_empty_right_table() throws ExecException, 
IOException {
+               //
+               // Test sorted union with two tables and right table is empty
+               //
+               
+               // Sort tables
+               String orderby1 = "sort1 = ORDER table1 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby1);
+               
+               String orderby4 = "sort4 = ORDER table4 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby4);
+               
+               // Filter to remove all rows in table
+               String filter = "filtersort4 = FILTER sort4 BY e == 
'teststring' ;";
+               
+               pigServer.registerQuery(filter);
+               
+               Path newPath = new Path(getCurrentMethodName());
+               
+               // Store sorted tables
+               String pathSort1 = newPath.toString() + "1";
+               pigJob = pigServer.store("sort1", pathSort1, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE1_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String pathSort4 = newPath.toString() + "4";
+               pigJob = pigServer.store("filtersort4", pathSort4, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE4_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String queryLoad = "records1 = LOAD '"
+               + pathSort1 + ","
+               + pathSort4
+               +       "' USING 
org.apache.hadoop.zebra.pig.TableLoader('e,source_table,a,b,c,d,f,m1,str3,str4',
 'sorted');";
+               
+               System.out.println("queryLoad: " + queryLoad);
+               
+               pigServer.registerQuery(queryLoad);
+               
+               // Verify union table
+               ArrayList<ArrayList<Object>> resultTable = new 
ArrayList<ArrayList<Object>>();
+               
+               addResultRow(resultTable, "Apple",      0,      1001,   100.0f, 
1003L,  50e+2,  new DataByteArray("Apple"),     m1,     null,   null);
+               addResultRow(resultTable, "Hadoop",     0,      1002,   28.0f,  
1000L,  50e+2,  new DataByteArray("Hadoop"),m1, null,   null);
+               addResultRow(resultTable, "Pig",        0,      1001,   50.0f,  
1000L,  50e+2,  new DataByteArray("Pig"),       m1,     null,   null);
+               addResultRow(resultTable, "Zebra",      0,      5,              
-3.25f, 1001L,  51e+2,  new DataByteArray("Zebra"),     m1,     null,   null);
+               
+               addResultRow(resultTable, "apple",      0,      1001,   101.0f, 
1001L,  50e+2,  new DataByteArray("apple"),     m1,     null,   null);
+               addResultRow(resultTable, "hadoop",     0,      1000,   0.0f,   
1002L,  52e+2,  new DataByteArray("hadoop"),m1, null,   null);
+               addResultRow(resultTable, "pig",        0,      1001,   52.0f,  
1001L,  50e+2,  new DataByteArray("pig"),       m1,     null,   null);
+               addResultRow(resultTable, "zebra",      0,      -1,             
3.25f,  1000L,  50e+2,  new DataByteArray("zebra"),     m1,     null,   null);
+               
+               // Verify union table
+               Iterator<Tuple> it = pigServer.openIterator("records1");
+               int numbRows = verifyTable(resultTable, 0, it);
+               
+               Assert.assertEquals(numbRows, table1.length);
+       }
+       
+       @Test
+       public void test_union_empty_middle_table() throws ExecException, 
IOException {
+               //
+               // Test sorted union with three tables and middle table is empty
+               //
+               
+               // Sort tables
+               String orderby1 = "sort1 = ORDER table1 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby1);
+               
+               String orderby2 = "sort2 = ORDER table2 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby2);
+               
+               String orderby4 = "sort4 = ORDER table4 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby4);
+               
+               // Filter to remove all rows in table
+               String filter = "filtersort2 = FILTER sort2 BY e == 
'teststring' ;";
+               
+               pigServer.registerQuery(filter);
+               
+               Path newPath = new Path(getCurrentMethodName());
+               
+               // Store sorted tables
+               String pathSort1 = newPath.toString() + "1";
+               pigJob = pigServer.store("sort1", pathSort1, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE1_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String pathSort2 = newPath.toString() + "2";
+               pigJob = pigServer.store("filtersort2", pathSort2, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE2_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String pathSort4 = newPath.toString() + "4";
+               pigJob = pigServer.store("sort4", pathSort4, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE4_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String queryLoad = "records1 = LOAD '"
+               + pathSort1 + ","
+               + pathSort2 + ","
+               + pathSort4
+               +       "' USING 
org.apache.hadoop.zebra.pig.TableLoader('e,source_table,a,b,c,d,f,m1,str3,str4',
 'sorted');";
+               
+               System.out.println("queryLoad: " + queryLoad);
+               
+               pigServer.registerQuery(queryLoad);
+               
+               // Verify union table
+               ArrayList<ArrayList<Object>> resultTable = new 
ArrayList<ArrayList<Object>>();
+               
+               addResultRow(resultTable, "Apple",      0,      1001,   100.0f, 
1003L,  50e+2,  new DataByteArray("Apple"),     m1,     null,   null);
+               addResultRow(resultTable, "Hadoop",     0,      1002,   28.0f,  
1000L,  50e+2,  new DataByteArray("Hadoop"),m1, null,   null);
+               addResultRow(resultTable, "Pig",        0,      1001,   50.0f,  
1000L,  50e+2,  new DataByteArray("Pig"),       m1,     null,   null);
+               addResultRow(resultTable, "Zebra",      0,      5,              
-3.25f, 1001L,  51e+2,  new DataByteArray("Zebra"),     m1,     null,   null);
+               
+               addResultRow(resultTable, "a 1",        2,      null,   null,   
null,   null,   null,   null,   "Reno",         "Nevada");
+               addResultRow(resultTable, "a 2",        2,      null,   null,   
null,   null,   null,   null,   "Dallas",       "Texas");
+               addResultRow(resultTable, "a 3",        2,      null,   null,   
null,   null,   null,   null,   "Phoenix",      "Arizona");
+               addResultRow(resultTable, "a 4",        2,      null,   null,   
null,   null,   null,   null,   "New York",     "New York");
+               
+               addResultRow(resultTable, "a 5",        2,      null,   null,   
null,   null,   null,   null,   "Las Vegas","Nevada");
+               addResultRow(resultTable, "a 6",        2,      null,   null,   
null,   null,   null,   null,   "Santa Cruz","California");
+               addResultRow(resultTable, "a 7",        2,      null,   null,   
null,   null,   null,   null,   "San Jose",     "California");
+               addResultRow(resultTable, "a 8",        2,      null,   null,   
null,   null,   null,   null,   "Cupertino","California");
+               
+               addResultRow(resultTable, "apple",      0,      1001,   101.0f, 
1001L,  50e+2,  new DataByteArray("apple"),     m1,     null,   null);
+               addResultRow(resultTable, "hadoop",     0,      1000,   0.0f,   
1002L,  52e+2,  new DataByteArray("hadoop"),m1, null,   null);
+               addResultRow(resultTable, "pig",        0,      1001,   52.0f,  
1001L,  50e+2,  new DataByteArray("pig"),       m1,     null,   null);
+               addResultRow(resultTable, "zebra",      0,      -1,             
3.25f,  1000L,  50e+2,  new DataByteArray("zebra"),     m1,     null,   null);
+               
+               // Verify union table
+               Iterator<Tuple> it = pigServer.openIterator("records1");
+               int numbRows = verifyTable(resultTable, 0, it);
+               
+               Assert.assertEquals(numbRows, table1.length + table4.length);
+       }
+       
+       @Test
+       public void test_union_empty_many_table() throws ExecException, 
IOException {
+               //
+               // Test sorted union with three tables with left and right 
table empty
+               //
+               
+               // Sort tables
+               String orderby1 = "sort1 = ORDER table1 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby1);
+               
+               String orderby2 = "sort2 = ORDER table2 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby2);
+               
+               String orderby4 = "sort4 = ORDER table4 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby4);
+               
+               // Filter to remove all rows in table
+               String filter1 = "filtersort1 = FILTER sort1 BY e == 
'teststring' ;";
+               pigServer.registerQuery(filter1);
+               
+               // Filter to remove all rows in table
+               String filter4 = "filtersort4 = FILTER sort4 BY e == 
'teststring' ;";
+               pigServer.registerQuery(filter4);
+               
+               Path newPath = new Path(getCurrentMethodName());
+               
+               // Store sorted tables
+               String pathSort1 = newPath.toString() + "1";
+               pigJob = pigServer.store("filtersort1", pathSort1, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE1_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String pathSort2 = newPath.toString() + "2";
+               pigJob = pigServer.store("sort2", pathSort2, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE2_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String pathSort4 = newPath.toString() + "4";
+               pigJob = pigServer.store("filtersort4", pathSort4, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE4_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String queryLoad = "records1 = LOAD '"
+               + pathSort1 + ","
+               + pathSort2 + ","
+               + pathSort4
+               +       "' USING 
org.apache.hadoop.zebra.pig.TableLoader('e,source_table,a,b,c,d,f,m1,str3,str4',
 'sorted');";
+               
+               System.out.println("queryLoad: " + queryLoad);
+               
+               pigServer.registerQuery(queryLoad);
+               
+               // Verify union table
+               ArrayList<ArrayList<Object>> resultTable = new 
ArrayList<ArrayList<Object>>();
+               
+               addResultRow(resultTable, "beige",      1,      2000,   33.0f,  
1006L,  52e+2,  new DataByteArray("beige"),     m2, null, null);
+               addResultRow(resultTable, "brown",      1,      1001,   53.0f,  
1001L,  52e+2,  new DataByteArray("brown"),     m2, null, null);
+               addResultRow(resultTable, "gray",       1,      1001,   50.0f,  
1008L,  52e+2,  new DataByteArray("gray"),      m2, null, null);
+               addResultRow(resultTable, "green",      1,      15,             
56.0f,  1004L,  50e+2,  new DataByteArray("green"),     m2, null, null);
+               
+               addResultRow(resultTable, "orange",     1,      -1,             
-99.0f, 1002L,  51e+2,  new DataByteArray("orange"),m2, null, null);
+               addResultRow(resultTable, "purple",     1,      1001,   102.0f, 
1001L,  52e+2,  new DataByteArray("purple"),m2, null, null);
+               addResultRow(resultTable, "white",      1,      1001,   100.0f, 
1003L,  55e+2,  new DataByteArray("white"),     m2, null, null);
+               
+               // Verify union table
+               Iterator<Tuple> it = pigServer.openIterator("records1");
+               int numbRows = verifyTable(resultTable, 0, it);
+               
+               Assert.assertEquals(numbRows, table2.length);
+       }
+       
+       @Test
+       public void test_union_empty_all_table() throws ExecException, 
IOException {
+               //
+               // Test sorted union with two tables and both are empty
+               //
+               
+               // Sort tables
+               String orderby1 = "sort1 = ORDER table1 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby1);
+               
+               String orderby4 = "sort4 = ORDER table4 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby4);
+               
+               // Filter to remove all rows in table
+               String filter1 = "filtersort1 = FILTER sort1 BY e == 
'teststring' ;";
+               pigServer.registerQuery(filter1);
+               
+               // Filter to remove all rows in table
+               String filter4 = "filtersort4 = FILTER sort4 BY e == 
'teststring' ;";
+               pigServer.registerQuery(filter4);
+               
+               Path newPath = new Path(getCurrentMethodName());
+               
+               // Store sorted tables
+               String pathSort1 = newPath.toString() + "1";
+               pigJob = pigServer.store("filtersort1", pathSort1, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE1_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String pathSort4 = newPath.toString() + "4";
+               pigJob = pigServer.store("filtersort4", pathSort4, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE4_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String queryLoad = "records1 = LOAD '"
+               + pathSort1 + ","
+               + pathSort4
+               +       "' USING 
org.apache.hadoop.zebra.pig.TableLoader('e,source_table,a,b,c,d,f,m1,str3,str4',
 'sorted');";
+               
+               System.out.println("queryLoad: " + queryLoad);
+               
+               pigServer.registerQuery(queryLoad);
+               
+               int numbRows = printTable("records1");
+               
+               Assert.assertEquals(numbRows, 0);
+       }
+       
+       @Test
+       public void test_pig_statements() throws ExecException, IOException {
+               //
+               // Test sorted union with two tables after pig filter and pig 
foreach statements
+               //
+               
+               // Sort tables
+               String orderby1 = "sort1 = ORDER table1 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby1);
+               
+               String orderby4 = "sort4 = ORDER table4 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby4);
+               
+               // Table after pig filter
+               String filter1 = "filtersort1 = FILTER sort1 BY a == 1001 ;";
+               pigServer.registerQuery(filter1);
+               
+               Path newPath = new Path(getCurrentMethodName());
+               
+               // Store sorted tables
+               String pathSort1 = newPath.toString() + "1";
+               pigJob = pigServer.store("filtersort1", pathSort1, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE1_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String pathSort4 = newPath.toString() + "4";
+               pigJob = pigServer.store("sort4", pathSort4, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE4_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String queryLoad = "records1 = LOAD '"
+               + pathSort1 + ","
+               + pathSort4
+               +       "' USING 
org.apache.hadoop.zebra.pig.TableLoader('e,source_table,a,b,c,d,f,m1,str3,str4',
 'sorted');";
+               
+               System.out.println("queryLoad: " + queryLoad);
+               
+               pigServer.registerQuery(queryLoad);
+               
+               // Verify union table
+               ArrayList<ArrayList<Object>> resultTable = new 
ArrayList<ArrayList<Object>>();
+               
+               addResultRow(resultTable, "Apple",      0,      1001,   100.0f, 
1003L,  50e+2,  new DataByteArray("Apple"),     m1,     null,   null);
+               addResultRow(resultTable, "Pig",        0,      1001,   50.0f,  
1000L,  50e+2,  new DataByteArray("Pig"),       m1,     null,   null);
+               addResultRow(resultTable, "a 1",        1,      null,   null,   
null,   null,   null,   null,   "Reno",         "Nevada");
+               addResultRow(resultTable, "a 2",        1,      null,   null,   
null,   null,   null,   null,   "Dallas",       "Texas");
+               
+               addResultRow(resultTable, "a 3",        1,      null,   null,   
null,   null,   null,   null,   "Phoenix",      "Arizona");
+               addResultRow(resultTable, "a 4",        1,      null,   null,   
null,   null,   null,   null,   "New York",     "New York");
+               addResultRow(resultTable, "a 5",        1,      null,   null,   
null,   null,   null,   null,   "Las Vegas","Nevada");
+               addResultRow(resultTable, "a 6",        1,      null,   null,   
null,   null,   null,   null,   "Santa Cruz","California");
+               
+               addResultRow(resultTable, "a 7",        1,      null,   null,   
null,   null,   null,   null,   "San Jose",     "California");
+               addResultRow(resultTable, "a 8",        1,      null,   null,   
null,   null,   null,   null,   "Cupertino","California");
+               addResultRow(resultTable, "apple",      0,      1001,   101.0f, 
1001L,  50e+2,  new DataByteArray("apple"),     m1,     null,   null);
+               addResultRow(resultTable, "pig",        0,      1001,   52.0f,  
1001L,  50e+2,  new DataByteArray("pig"),       m1,     null,   null);
+               
+               // Verify union table
+               Iterator<Tuple> it = pigServer.openIterator("records1");
+               int numbRows = verifyTable(resultTable, 0, it);
+               
+               Assert.assertEquals(12, numbRows);
+       }
+       
+       @Test
+       public void test_union_as_input() throws ExecException, IOException {
+               //
+               // Test sorted union with two tables, one of the tables is from 
previous sorted union
+               //
+               
+               // Sort tables
+               String orderby1 = "sort1 = ORDER table1 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby1);
+               
+               String orderby2 = "sort2 = ORDER table2 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby2);
+               
+               String orderby4 = "sort4 = ORDER table4 BY " + "e" + " ;";
+               pigServer.registerQuery(orderby4);
+               
+               Path newPath = new Path(getCurrentMethodName());
+               
+               // Store sorted tables
+               String pathSort1 = newPath.toString() + ++fileId;
+               pigJob = pigServer.store("sort1", pathSort1, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE1_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String pathSort2 = newPath.toString() + ++fileId;
+               pigJob = pigServer.store("sort2", pathSort2, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE2_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               String pathSort4 = newPath.toString() + ++fileId;
+               pigJob = pigServer.store("sort4", pathSort4, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE4_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               // Create first sorted union
+               String queryLoad1 = "records1 = LOAD '"
+               + pathSort1 + ","
+               + pathSort2
+               +       "' USING 
org.apache.hadoop.zebra.pig.TableLoader('e,a,b,c,d,f,m1', 'sorted');";
+               pigServer.registerQuery(queryLoad1);
+               
+               // Add sort info to union table before store
+               String sortunion = "sortrecords1 = ORDER records1 BY " + "e" + 
" ;";
+               pigServer.registerQuery(sortunion);
+               
+               // Store union table
+               String pathUnion1 = newPath.toString() + ++fileId;
+               pigJob = pigServer.store("sortrecords1", pathUnion1, 
TableStorer.class.getCanonicalName() +
+                       "('" + TABLE1_STORAGE + "')");
+               Assert.assertNull(pigJob.getException());
+               
+               // Create second sorted union
+               String queryLoad2 = "records2 = LOAD '"
+               + pathUnion1 + ","
+               + pathSort4
+               +       "' USING 
org.apache.hadoop.zebra.pig.TableLoader('e,source_table,a,b,str3,str4', 
'sorted');";
+               
+               System.out.println("queryLoad: " + queryLoad2);
+               pigServer.registerQuery(queryLoad2);
+               
+               // Verify union table
+               ArrayList<ArrayList<Object>> resultTable = new 
ArrayList<ArrayList<Object>>();
+               
+               addResultRow(resultTable, "Apple",      0,      1001,   100.0f, 
null,   null);
+               addResultRow(resultTable, "Hadoop",     0,      1002,   28.0f,  
null,   null);
+               addResultRow(resultTable, "Pig",        0,      1001,   50.0f,  
null,   null);
+               addResultRow(resultTable, "Zebra",      0,      5,              
-3.25f, null,   null);
+               
+               addResultRow(resultTable, "a 1",        1,      null,   null,   
"Reno",         "Nevada");
+               addResultRow(resultTable, "a 2",        1,      null,   null,   
"Dallas",       "Texas");
+               addResultRow(resultTable, "a 3",        1,      null,   null,   
"Phoenix",      "Arizona");
+               addResultRow(resultTable, "a 4",        1,      null,   null,   
"New York",     "New York");
+               
+               addResultRow(resultTable, "a 5",        1,      null,   null,   
"Las Vegas","Nevada");
+               addResultRow(resultTable, "a 6",        1,      null,   null,   
"Santa Cruz","California");
+               addResultRow(resultTable, "a 7",        1,      null,   null,   
"San Jose","California");
+               addResultRow(resultTable, "a 8",        1,      null,   null,   
"Cupertino","California");
+               
+               addResultRow(resultTable, "apple",      0,      1001,   101.0f, 
null,   null);
+               addResultRow(resultTable, "beige",      0,      2000,   33.0f,  
null,   null);
+               addResultRow(resultTable, "brown",      0,      1001,   53.0f,  
null,   null);
+               addResultRow(resultTable, "gray",       0,      1001,   50.0f,  
null,   null);
+               
+               addResultRow(resultTable, "green",      0,      15,             
56.0f,  null,   null);
+               addResultRow(resultTable, "hadoop",     0,      1000,   0.0f,   
null,   null);
+               addResultRow(resultTable, "orange",     0,      -1,             
-99.0f, null,   null);
+               addResultRow(resultTable, "pig",        0,      1001,   52.0f,  
null,   null);
+               
+               addResultRow(resultTable, "purple",     0,      1001,   102.0f, 
null,   null);
+               addResultRow(resultTable, "white",      0,      1001,   100.0f, 
null,   null);
+               addResultRow(resultTable, "zebra",      0,      -1,             
3.25f,  null,   null);
+               
+               // Verify union table
+               Iterator<Tuple> it = pigServer.openIterator("records2");
+               int numbRows = verifyTable(resultTable, 0, it);
+               
+               Assert.assertEquals(23, numbRows);
+       }
+       
+       /**
+        * Verify union output table with expected results
+        * 
+        */
+       private int verifyTable(ArrayList<ArrayList<Object>> resultTable, int 
keyColumn, Iterator<Tuple> it) throws IOException {
+               int numbRows = 0;
+               int index = 0;
+               Object value = resultTable.get(index).get(keyColumn);  // get 
value of primary key
+               
+               while (it.hasNext()) {
+                       Tuple rowValues = it.next();
+                       
+                       // If last primary sort key does match then search for 
next matching key
+                       if (! compareObj(value, rowValues.get(keyColumn))) {
+                               int subIndex = index + 1;
+                               while (subIndex < resultTable.size()) {
+                                       if ( ! compareObj(value, 
resultTable.get(subIndex).get(keyColumn)) ) {  // found new key
+                                               index = subIndex;
+                                               value = 
resultTable.get(index).get(keyColumn);
+                                               break;
+                                       }
+                                       ++subIndex;
+                               }
+                               Assert.assertEquals("Table comparison error for 
row : " + numbRows + " - no key found for : "
+                                       + rowValues.get(keyColumn), value, 
rowValues.get(keyColumn));
+                       }
+                       // Search for matching row with this primary key
+                       int subIndex = index;
+                       
+                       while (subIndex < resultTable.size()) {
+                               // Compare row
+                               ArrayList<Object> resultRow = 
resultTable.get(subIndex);
+                               if ( compareRow(rowValues, resultRow) )
+                                       break; // found matching row
+                               ++subIndex;
+                               Assert.assertEquals("Table comparison error for 
row : " + numbRows + " - no matching row found for : "
+                                       + rowValues.get(keyColumn), value, 
resultTable.get(subIndex).get(keyColumn));
+                       }
+                       ++numbRows;
+               }
+               Assert.assertEquals(resultTable.size(), numbRows);  // verify 
expected row count
+               return numbRows;
+       }
+       
+       /**
+        * Compare table rows
+        * 
+        */
+       private boolean compareRow(Tuple rowValues, ArrayList<Object> 
resultRow) throws IOException {
+               boolean result = true;
+               Assert.assertEquals(resultRow.size(), rowValues.size());
+               for (int i = 0; i < rowValues.size(); ++i) {
+                       if (! compareObj(rowValues.get(i), resultRow.get(i)) ) {
+                               result = false;
+                               break;
+                       }
+               }
+               return result;
+       }
+       
+       /**
+        * Compare table values
+        * 
+        */
+       private boolean compareObj(Object object1, Object object2) {
+               if (object1 == null) {
+                       if (object2 == null)
+                               return true;
+                       else
+                               return false;
+               } else if (object1.equals(object2))
+                       return true;
+               else
+                       return false;
+       }
+       
+       /**
+        *Add a row to expected results table
+        * 
+        */
+       private void addResultRow(ArrayList<ArrayList<Object>> resultTable, 
Object ... values) {
+               ArrayList<Object> resultRow = new ArrayList<Object>();
+               
+               for (int i = 0; i < values.length; i++) {
+                       resultRow.add(values[i]);
+               }
+               resultTable.add(resultRow);
+       }
+       
+       /**
+        * Print Pig Table (for debugging)
+        * 
+        */
+       private int printTable(String tablename) throws IOException {
+               Iterator<Tuple> it1 = pigServer.openIterator(tablename);
+               int numbRows = 0;
+               while (it1.hasNext()) {
+                       Tuple RowValue1 = it1.next();
+                       ++numbRows;
+                       System.out.println();
+                       for (int i = 0; i < RowValue1.size(); ++i)
+                               System.out.println("DEBUG: " + tablename + " 
RowValue.get(" + i + ") = " + RowValue1.get(i));
+               }
+               System.out.println("\nRow count : " + numbRows);
+               return numbRows;
+       }
+       
+       /**
+        * Return the name of the routine that called getCurrentMethodName
+        * 
+        */
+       private String getCurrentMethodName() {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               PrintWriter pw = new PrintWriter(baos);
+               (new Throwable()).printStackTrace(pw);
+               pw.flush();
+               String stackTrace = baos.toString();
+               pw.close();
+               
+               StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+               tok.nextToken(); // 'java.lang.Throwable'
+               tok.nextToken(); // 'at ...getCurrentMethodName'
+               String l = tok.nextToken(); // 'at ...<caller to 
getCurrentRoutine>'
+               // Parse line 3
+               tok = new StringTokenizer(l.trim(), " <(");
+               String t = tok.nextToken(); // 'at'
+               t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+               return t;
+       }
+       
+}


Reply via email to