Author: yanz
Date: Mon Jan  4 19:33:53 2010
New Revision: 895753

URL: http://svn.apache.org/viewvc?rev=895753&view=rev
Log:
PIG-1167: Hadoop file glob support (yanz)

Added:
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=895753&r1=895752&r2=895753&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Mon Jan  4 19:33:53 2010
@@ -52,6 +52,8 @@
 
   BUG FIXES
 
+    PIG-1167: Hadoop file glob support (yanz)
+
     PIG-1153: Record split exception fix (yanz)
 
     PIG-1145: Merge Join on Large Table throws an EOF exception (yanz)

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=895753&r1=895752&r2=895753&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
 Mon Jan  4 19:33:53 2010
@@ -226,7 +226,7 @@
                                FileSystem fs = p.getFileSystem(jobConf);
                                FileStatus[] matches = fs.globStatus(p);
                                if (matches == null) {
-                                       LOG.warn("Input path does not exist: " 
+ p);
+                                       throw new IOException("Input path does 
not exist: " + p);
                                }
                                else if (matches.length == 0) {
                                        LOG.warn("Input Pattern " + p + " 
matches 0 files");
@@ -293,33 +293,14 @@
                
                Projection projection;
 
-               if (!fileName.contains(",")) { // one table;
-                       org.apache.hadoop.zebra.schema.Schema tschema = 
BasicTable.Reader.getSchema(new Path(fileName), jobConf);
-                       try {
-                               projection = new 
org.apache.hadoop.zebra.types.Projection(tschema, 
TableInputFormat.getProjection(jobConf));
-                               projectionSchema = 
projection.getProjectionSchema();
-                       } catch (ParseException e) {
-                               throw new IOException("Schema parsing failed : 
"+e.getMessage());
-                       }
-               } else { // table union;
-                       org.apache.hadoop.zebra.schema.Schema unionSchema = new 
org.apache.hadoop.zebra.schema.Schema();
-                       for (Path p : paths) {
-                               org.apache.hadoop.zebra.schema.Schema schema = 
BasicTable.Reader.getSchema(p, jobConf);
-                               try {
-                                       unionSchema.unionSchema(schema);
-                               } catch (ParseException e) {
-                                       throw new IOException(e.getMessage());
-                               }
-                       }
-                       
-                       try {
-                               projection = new 
org.apache.hadoop.zebra.types.Projection(unionSchema, 
TableInputFormat.getProjection(jobConf));
-                               projectionSchema = 
projection.getProjectionSchema();
-                       } catch (ParseException e) {
-                               throw new IOException("Schema parsing failed : 
"+e.getMessage());
-                       }
-               }               
-    
+    org.apache.hadoop.zebra.schema.Schema tschema = 
TableInputFormat.getSchema(jobConf);
+    try {
+      projection = new org.apache.hadoop.zebra.types.Projection(tschema, 
TableInputFormat.getProjection(jobConf));
+      projectionSchema = projection.getProjectionSchema();
+    } catch (ParseException e) {
+      throw new IOException("Schema parsing failed : "+e.getMessage());
+    }
+
                if (projectionSchema == null) {
                        throw new IOException("Cannot determine table 
projection schema");
                }

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java?rev=895753&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
 Mon Jan  4 19:33:53 2010
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestGlobTableLoader{
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+  private static Configuration conf;
+  private static String zebraJar;
+  private static String whichCluster;
+  private static FileSystem fs;
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    // if whichCluster is not defined, or defined something other than
+    // "realCluster" or "miniCluster", set it to "miniCluster"
+    if (System.getProperty("whichCluster") == null
+        || ((!System.getProperty("whichCluster")
+            .equalsIgnoreCase("realCluster")) && (!System.getProperty(
+            "whichCluster").equalsIgnoreCase("miniCluster")))) {
+      System.setProperty("whichCluster", "miniCluster");
+      whichCluster = System.getProperty("whichCluster");
+    } else {
+      whichCluster = System.getProperty("whichCluster");
+    }
+
+    System.out.println("cluster: " + whichCluster);
+    if (whichCluster.equalsIgnoreCase("realCluster")
+        && System.getenv("HADOOP_HOME") == null) {
+      System.out.println("Please set HADOOP_HOME");
+      System.exit(0);
+    }
+
+    conf = new Configuration();
+
+    if (whichCluster.equalsIgnoreCase("realCluster")
+        && System.getenv("USER") == null) {
+      System.out.println("Please set USER");
+      System.exit(0);
+    }
+    zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
+    File file = new File(zebraJar);
+    if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) {
+      System.out.println("Please put zebra.jar at hadoop_home/../jars");
+      System.exit(0);
+    }
+
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+          .toProperties(conf));
+      pigServer.registerJar(zebraJar);
+      pathTable = new Path("/user/" + System.getenv("USER")
+          + "/TestMapTableLoader");
+      removeDir(pathTable);
+      fs = pathTable.getFileSystem(conf);
+    }
+
+    if (whichCluster.equalsIgnoreCase("miniCluster")) {
+      if (execType == ExecType.MAPREDUCE) {
+        cluster = MiniCluster.buildCluster();
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        fs = cluster.getFileSystem();
+        pathTable = new Path(fs.getWorkingDirectory()
+            + "/TestMapTableLoader1");
+        removeDir(pathTable);
+        System.out.println("path1 =" + pathTable);
+      } else {
+        pigServer = new PigServer(ExecType.LOCAL);
+      }
+    }
+
+
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "m1:map(string)", "[m1#{a}]", conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 2;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("a", "x");
+        map.put("b", "y");
+        map.put("c", "z");
+        tuple.set(0, map);
+
+        try {
+          inserters[i].insert(new BytesWritable(("key" + i).getBytes()), 
tuple);
+        } catch (Exception e) {
+          System.out.println(e.getMessage());
+        }
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+  public static void removeDir(Path outPath) throws IOException {
+    String command = null;
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+    command = System.getenv("HADOOP_HOME") +"/bin/hadoop fs -rmr " + 
outPath.toString();
+    }
+    else{
+    command = "rm -rf " + outPath.toString();
+    }
+    Runtime runtime = Runtime.getRuntime();
+    Process proc = runtime.exec(command);
+    int exitVal = -1;
+    try {
+      exitVal = proc.waitFor();
+    } catch (InterruptedException e) {
+      System.err.println(e);
+    }
+    
+  }
+
+  // @Test
+  public void test1() throws IOException, ParseException {
+    String projection = new String("m1#{b}");
+    BasicTable.Reader reader = new BasicTable.Reader(pathTable, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(pathTable, conf);
+    reader.setProjection(projection);
+
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+    // HashMap<String, Object> mapval;
+    while (!scanner.atEnd()) {
+      scanner.getKey(key);
+      // Assert.assertEquals(key, new BytesWritable("key0".getBytes()));
+      scanner.getValue(value);
+      System.out.println("key = " + key + " value = " + value);
+
+      // mapval = (HashMap<String, Object>) value.get(0);
+      // Assert.assertEquals("x", mapval.get("a"));
+      // Assert.assertEquals(null, mapval.get("b"));
+      // Assert.assertEquals(null, mapval.get("c"));
+      scanner.advance();
+    }
+    reader.close();
+  }
+
+  @Test
+  public void testReader() throws ExecException, IOException {
+    pathTable = new Path("/user/" + System.getenv("USER")
+        + "/{TestMapTableLoader1}");
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('m1#{a}');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+  }
+}

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java?rev=895753&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
 Mon Jan  4 19:33:53 2010
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+
+import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestOrderPreserveMultiTableGlob {
+       
+       final static int NUMB_TABLE = 10;               // number of tables for 
stress test
+       final static int NUMB_TABLE_ROWS = 5;   // number of rows for each table
+       
+       final static String TABLE_SCHEMA = "int1:int,str1:string,byte1:bytes";
+       final static String TABLE_STORAGE = "[int1,str1,byte1]";
+       
+       static int fileId = 0;
+       static int sortId = 0;
+       
+       protected static ExecType execType = ExecType.MAPREDUCE;
+       private static MiniCluster cluster;
+       protected static PigServer pigServer;
+       protected static ExecJob pigJob;
+       
+       private static ArrayList<Path> pathTables;
+       private static int totalTableRows =0;
+       
+       private static Configuration conf;
+       private static FileSystem fs;
+       
+       private static String zebraJar;
+       private static String whichCluster;
+       
+       @BeforeClass
+       public static void setUp() throws Exception {
+               if (System.getProperty("hadoop.log.dir") == null) {
+                       String base = new File(".").getPath(); // 
getAbsolutePath();
+                       System.setProperty("hadoop.log.dir", new 
Path(base).toString() + "./logs");
+               }
+               
+               // if whichCluster is not defined, or defined something other 
than
+               // "realCluster" or "miniCluster", set it to "realCluster"
+               if (System.getProperty("whichCluster") == null
+                               || ((!System.getProperty("whichCluster")
+                                               
.equalsIgnoreCase("realCluster")) && (!System.getProperty(
+                                               
"whichCluster").equalsIgnoreCase("miniCluster")))) {
+                       System.setProperty("whichCluster", "miniCluster");
+                       whichCluster = System.getProperty("whichCluster");
+               } else {
+                       whichCluster = System.getProperty("whichCluster");
+               }
+               
+               System.out.println("cluster: " + whichCluster);
+               if (whichCluster.equalsIgnoreCase("realCluster")
+                               && System.getenv("HADOOP_HOME") == null) {
+                       System.out.println("Please set HADOOP_HOME");
+                       System.exit(0);
+               }
+               
+               conf = new Configuration();
+               
+               if (whichCluster.equalsIgnoreCase("realCluster")
+                               && System.getenv("USER") == null) {
+                       System.out.println("Please set USER");
+                       System.exit(0);
+               }
+               zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
+               File file = new File(zebraJar);
+               if (!file.exists() && 
whichCluster.equalsIgnoreCase("realCulster")) {
+                       System.out.println("Please put zebra.jar at 
hadoop_home/../jars");
+                       System.exit(0);
+               }
+               
+               if (whichCluster.equalsIgnoreCase("realCluster")) {
+                       pigServer = new PigServer(ExecType.MAPREDUCE, 
ConfigurationUtil
+                                       .toProperties(conf));
+                       pigServer.registerJar(zebraJar);
+                       
+                       pathTables = new ArrayList<Path>();
+                       for (int i=0; i<NUMB_TABLE; ++i) {
+                               Path pathTable = new Path("/user/" + 
System.getenv("USER")
+                                               + "/TestOderPerserveMultiTable" 
+ i);
+                               pathTables.add(pathTable);
+                               removeDir(pathTable);
+                       }
+                       fs = pathTables.get(0).getFileSystem(conf);
+               }
+               
+               if (whichCluster.equalsIgnoreCase("miniCluster")) {
+                       if (execType == ExecType.MAPREDUCE) {
+                               cluster = MiniCluster.buildCluster();
+                               pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+                               fs = cluster.getFileSystem();
+                               
+                               pathTables = new ArrayList<Path>();
+                               for (int i=0; i<NUMB_TABLE; ++i) {
+                                       Path pathTable = new 
Path(fs.getWorkingDirectory()
+                                                       + 
"/TestOderPerserveMultiTable" + i);
+                                       pathTables.add(pathTable);
+                                       removeDir(pathTable);
+                               }
+                       } else {
+                               pigServer = new PigServer(ExecType.LOCAL);
+                       }
+               }
+               
+               // Create tables
+               for (int i=0; i<NUMB_TABLE; ++i) {
+                       // Create table data
+                       Object[][] table = new Object[NUMB_TABLE_ROWS][3];  // 
three columns
+                       
+                       for (int j=0; j<NUMB_TABLE_ROWS; ++j) {
+                               table[j][0] = i;
+                               table[j][1] = new String("string" + j);
+                               table[j][2] = new DataByteArray("byte" + 
(NUMB_TABLE_ROWS - j));
+                               ++totalTableRows;
+                       }
+                       // Create table
+                       createTable(pathTables.get(i), TABLE_SCHEMA, 
TABLE_STORAGE, table);
+                       
+                       // Load Table
+                       String query = "table" + i + " = LOAD '" + 
pathTables.get(i).toString() + 
+                                       "' USING 
org.apache.hadoop.zebra.pig.TableLoader();";
+                       pigServer.registerQuery(query);
+               }
+       }
+       
+       private static void createTable(Path path, String schemaString, String 
storageString, Object[][] tableData)
+                       throws IOException {
+               //
+               // Create table from tableData array
+               //
+               BasicTable.Writer writer = new BasicTable.Writer(path, 
schemaString, storageString, conf);
+               
+               Schema schema = writer.getSchema();
+               Tuple tuple = TypesUtils.createTuple(schema);
+               TableInserter inserter = writer.getInserter("ins", false);
+               
+               for (int i = 0; i < tableData.length; ++i) {
+                       TypesUtils.resetTuple(tuple);
+                       for (int k = 0; k < tableData[i].length; ++k) {
+                               tuple.set(k, tableData[i][k]);
+                               System.out.println("DEBUG: setting tuple k=" + 
k + "value= " + tableData[i][k]);
+                       }
+                       inserter.insert(new BytesWritable(("key" + 
i).getBytes()), tuple);
+               }
+               inserter.close();
+               writer.close();
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               pigServer.shutdown();
+       }
+       
+       public static void removeDir(Path outPath) throws IOException {
+               String command = null;
+               if (whichCluster.equalsIgnoreCase("realCluster")) {
+                       command = System.getenv("HADOOP_HOME") +"/bin/hadoop fs 
-rmr " + outPath.toString();
+               }
+               else{
+                       command = "rm -rf " + outPath.toString();
+               }
+               Runtime runtime = Runtime.getRuntime();
+               Process proc = runtime.exec(command);
+               int exitVal = -1;
+               try {
+                       exitVal = proc.waitFor();
+               } catch (InterruptedException e) {
+                       System.err.println(e);
+               }
+       }
+       
+       private Iterator<Tuple> testOrderPreserveUnion(ArrayList<String> 
inputTables, String sortkey, String columns)
+                               throws IOException {
+               //
+               // Test order preserve union from input tables and provided 
output columns
+               //
+               Assert.assertTrue("Table union requires two or more input 
tables", inputTables.size() >= 2);
+               
+               Path newPath = new Path(getCurrentMethodName());
+               ArrayList<String> pathList = new ArrayList<String>();
+               
+               // Load and store each of the input tables
+               for (int i=0; i<inputTables.size(); ++i) {
+                       String tablename = inputTables.get(i);
+                       String sortName = "sort" + ++sortId;
+                       
+                       // Sort tables
+                       String orderby = sortName + " = ORDER " + tablename + " 
BY " + sortkey + " ;";
+                       pigServer.registerQuery(orderby);
+                       
+                       String sortPath = new String(newPath.toString() + 
++fileId);  // increment fileId suffix
+                       
+                       // Store sorted tables
+                       pigJob = pigServer.store(sortName, sortPath, 
TableStorer.class.getCanonicalName() +
+                               "('" + TABLE_STORAGE + "')");
+                       Assert.assertNull(pigJob.getException());
+                       
+                       pathList.add(sortPath);  // add table path to list
+               }
+               
+               String paths = new String();
+               
+    paths += newPath.toString() + "{";
+    fileId = 0;
+               for (String path:pathList)
+                       paths += ++fileId + ",";
+               paths = paths.substring(0, paths.lastIndexOf(","));  // remove 
trailing comma
+    paths += "}";
+               
+               String queryLoad = "records1 = LOAD '"
+               + paths
+               +       "' USING org.apache.hadoop.zebra.pig.TableLoader('" + 
columns + "', 'sorted');";
+               
+               System.out.println("queryLoad: " + queryLoad);
+               pigServer.registerQuery(queryLoad);
+               
+               // Return iterator
+               Iterator<Tuple> it1 = pigServer.openIterator("records1");
+               return it1;
+       }
+       
+       @Test
+       public void test_sorted_union_multi_table() throws ExecException, 
IOException {
+               //
+               // Test sorted union
+               //
+               
+               // Create input tables for order preserve union
+               ArrayList<String> inputTables = new ArrayList<String>();  // 
Input tables
+               for (int i=0; i<NUMB_TABLE; ++i) {
+                       inputTables.add("table" + i);  // add input table
+               }
+               
+               // Test with input tables and provided output columns
+               testOrderPreserveUnion(inputTables, "int1", "int1, str1, 
byte1");
+               
+               // Create results table for verification
+               ArrayList<ArrayList<Object>> resultTable = new 
ArrayList<ArrayList<Object>>();
+               for (int i=0; i<NUMB_TABLE; ++i) {
+                       for (int j=0; j<NUMB_TABLE_ROWS; ++j) {
+                               ArrayList<Object> resultRow = new 
ArrayList<Object>();
+                               
+                               resultRow.add(i);       // int1
+                               resultRow.add(new String("string" + j));        
// str1
+                               resultRow.add(new DataByteArray("byte" + 
(NUMB_TABLE_ROWS - j)));       // byte1
+                               
+                               resultTable.add(resultRow);
+                       }
+               }
+               
+               // Verify union table
+               Iterator<Tuple> it = pigServer.openIterator("records1");
+               int numbRows = verifyTable(resultTable, 0, it);
+               
+               Assert.assertEquals(totalTableRows, numbRows);
+               
+               // Print Table
+               //printTable("records1");
+       }
+       
+       /**
+        * Verify union output table with expected results
+        * 
+        */
+       private int verifyTable(ArrayList<ArrayList<Object>> resultTable, int 
keyColumn, Iterator<Tuple> it) throws IOException {
+               int numbRows = 0;
+               int index = 0;
+               Object value = resultTable.get(index).get(keyColumn);  // get 
value of primary key
+               
+               while (it.hasNext()) {
+                       Tuple rowValues = it.next();
+                       
+                       // If last primary sort key does match then search for 
next matching key
+                       if (! compareObj(value, rowValues.get(keyColumn))) {
+                               int subIndex = index + 1;
+                               while (subIndex < resultTable.size()) {
+                                       if ( ! compareObj(value, 
resultTable.get(subIndex).get(keyColumn)) ) {  // found new key
+                                               index = subIndex;
+                                               value = 
resultTable.get(index).get(keyColumn);
+                                               break;
+                                       }
+                                       ++subIndex;
+                               }
+                               Assert.assertEquals("Table comparison error for 
row : " + numbRows + " - no key found for : "
+                                       + rowValues.get(keyColumn), value, 
rowValues.get(keyColumn));
+                       }
+                       // Search for matching row with this primary key
+                       int subIndex = index;
+                       
+                       while (subIndex < resultTable.size()) {
+                               // Compare row
+                               ArrayList<Object> resultRow = 
resultTable.get(subIndex);
+                               if ( compareRow(rowValues, resultRow) )
+                                       break; // found matching row
+                               ++subIndex;
+                               Assert.assertEquals("Table comparison error for 
row : " + numbRows + " - no matching row found for : "
+                                       + rowValues.get(keyColumn), value, 
resultTable.get(subIndex).get(keyColumn));
+                       }
+                       ++numbRows;
+               }
+               Assert.assertEquals(resultTable.size(), numbRows);  // verify 
expected row count
+               return numbRows;
+       }
+       
+       /**
+        * Compare table rows
+        * 
+        */
+       private boolean compareRow(Tuple rowValues, ArrayList<Object> 
resultRow) throws IOException {
+               boolean result = true;
+               Assert.assertEquals(resultRow.size(), rowValues.size());
+               for (int i = 0; i < rowValues.size(); ++i) {
+                       if (! compareObj(rowValues.get(i), resultRow.get(i)) ) {
+                               result = false;
+                               break;
+                       }
+               }
+               return result;
+       }
+       
+       /**
+        * Compare table values
+        * 
+        */
+       private boolean compareObj(Object object1, Object object2) {
+               if (object1 == null) {
+                       if (object2 == null)
+                               return true;
+                       else
+                               return false;
+               } else if (object1.equals(object2))
+                       return true;
+               else
+                       return false;
+       }
+       
+       /**
+        * Print Pig Table (for debugging)
+        * 
+        */
+       private int printTable(String tablename) throws IOException {
+               Iterator<Tuple> it1 = pigServer.openIterator(tablename);
+               int numbRows = 0;
+               while (it1.hasNext()) {
+                       Tuple RowValue1 = it1.next();
+                       ++numbRows;
+                       System.out.println();
+                       for (int i = 0; i < RowValue1.size(); ++i)
+                               System.out.println("DEBUG: " + tablename + " 
RowValue.get(" + i + ") = " + RowValue1.get(i));
+               }
+               System.out.println("\nRow count : " + numbRows);
+               return numbRows;
+       }
+       
+       /**
+        * Return the name of the routine that called getCurrentMethodName
+        * 
+        */
+       private String getCurrentMethodName() {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               PrintWriter pw = new PrintWriter(baos);
+               (new Throwable()).printStackTrace(pw);
+               pw.flush();
+               String stackTrace = baos.toString();
+               pw.close();
+               
+               StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+               tok.nextToken(); // 'java.lang.Throwable'
+               tok.nextToken(); // 'at ...getCurrentMethodName'
+               String l = tok.nextToken(); // 'at ...<caller to 
getCurrentRoutine>'
+               // Parse line 3
+               tok = new StringTokenizer(l.trim(), " <(");
+               String t = tok.nextToken(); // 'at'
+               t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+               return t;
+       }
+       
+}

Modified: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java?rev=895753&r1=895752&r2=895753&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
 Mon Jan  4 19:33:53 2010
@@ -455,7 +455,7 @@
                } finally {
                        //System.out.println(getStackTrace(exception));
                        Assert.assertNotNull(exception);
-                       
Assert.assertTrue(getStackTrace(exception).contains("Schema file doesn't 
exist"));
+      Assert.assertTrue(getStackTrace(exception).contains("Input path does not 
exist: "));
                }
        }
        
@@ -465,6 +465,7 @@
                // Test sorted union error handling when one of the table paths 
is invalid (Negative test)
                //
                IOException exception = null;
+               String pathSort2 = null;
                
                try {
                        // Sort tables
@@ -479,7 +480,7 @@
                                "('" + TABLE1_STORAGE + "')");
                        Assert.assertNull(pigJob.getException());
                        
-                       String pathSort2 = newPath.toString() + "2";  // 
invalid path
+                       pathSort2 = newPath.toString() + "2";  // invalid path
                        
                        String queryLoad = "records1 = LOAD '"
                        + pathSort1 + ","


Reply via email to